From 8f5020fd9b34b7d188d2c42ccb9ccb14a039497c Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 8 Dec 2021 14:04:27 +0100 Subject: [PATCH 1/8] Make peer recovery work with archive data --- .../recovery/PeerRecoveryTargetService.java | 79 ++++++++++--------- .../recovery/RecoverySourceHandler.java | 8 +- .../indices/recovery/RecoveryTarget.java | 8 +- .../xpack/lucene/bwc/codecs/BWCCodec.java | 5 ++ .../SearchableSnapshotIndexEventListener.java | 27 ------- .../qa/repository-old-versions/build.gradle | 1 + .../oldrepos/OldRepositoryAccessIT.java | 24 ++++++ 7 files changed, 86 insertions(+), 66 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 968555e0628b3..67ed724e4d215 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -65,6 +65,7 @@ import static org.elasticsearch.core.TimeValue.timeValueMillis; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.isSearchableSnapshotStore; /** * The recovery target handles recoveries of peer shards of the shard+node to recover to. @@ -283,45 +284,51 @@ public static StartRecoveryRequest getStartRecoveryRequest( logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); Store.MetadataSnapshot metadataSnapshot; - try { - metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); - // Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index. + + if (isSearchableSnapshotStore(recoveryTarget.indexShard().indexSettings().getSettings())) { + metadataSnapshot = Store.MetadataSnapshot.EMPTY; + startingSeqNo = UNASSIGNED_SEQ_NO; + } else { try { - final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY); - final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID); - assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint; - } catch (IOException | TranslogCorruptedException e) { - logger.warn( - new ParameterizedMessage( - "error while reading global checkpoint from translog, " - + "resetting the starting sequence number from {} to unassigned and recovering as if there are none", - startingSeqNo - ), - e - ); + metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); + // Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the index. + try { + final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID); + assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint; + } catch (IOException | TranslogCorruptedException e) { + logger.warn( + new ParameterizedMessage( + "error while reading global checkpoint from translog, " + + "resetting the starting sequence number from {} to unassigned and recovering as if there are none", + startingSeqNo + ), + e + ); + metadataSnapshot = Store.MetadataSnapshot.EMPTY; + startingSeqNo = UNASSIGNED_SEQ_NO; + } + } catch (final org.apache.lucene.index.IndexNotFoundException e) { + // happens on an empty folder. no need to log + assert startingSeqNo == UNASSIGNED_SEQ_NO : startingSeqNo; + logger.trace("{} shard folder empty, recovering all files", recoveryTarget); + metadataSnapshot = Store.MetadataSnapshot.EMPTY; + } catch (final IOException e) { + if (startingSeqNo != UNASSIGNED_SEQ_NO) { + logger.warn( + new ParameterizedMessage( + "error while listing local files, resetting the starting sequence number from {} " + + "to unassigned and recovering as if there are none", + startingSeqNo + ), + e + ); + startingSeqNo = UNASSIGNED_SEQ_NO; + } else { + logger.warn("error while listing local files, recovering as if there are none", e); + } metadataSnapshot = Store.MetadataSnapshot.EMPTY; - startingSeqNo = UNASSIGNED_SEQ_NO; - } - } catch (final org.apache.lucene.index.IndexNotFoundException e) { - // happens on an empty folder. no need to log - assert startingSeqNo == UNASSIGNED_SEQ_NO : startingSeqNo; - logger.trace("{} shard folder empty, recovering all files", recoveryTarget); - metadataSnapshot = Store.MetadataSnapshot.EMPTY; - } catch (final IOException e) { - if (startingSeqNo != UNASSIGNED_SEQ_NO) { - logger.warn( - new ParameterizedMessage( - "error while listing local files, resetting the starting sequence number from {} " - + "to unassigned and recovering as if there are none", - startingSeqNo - ), - e - ); - startingSeqNo = UNASSIGNED_SEQ_NO; - } else { - logger.warn("error while listing local files, recovering as if there are none", e); } - metadataSnapshot = Store.MetadataSnapshot.EMPTY; } logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size()); request = new StartRecoveryRequest( diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 5c1245788360e..6144210ca9684 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -93,6 +93,7 @@ import java.util.stream.StreamSupport; import static org.elasticsearch.common.util.CollectionUtils.concatLists; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.isSearchableSnapshotStore; /** * RecoverySourceHandler handles the three phases of shard recovery, which is @@ -210,7 +211,8 @@ public void recoverToTarget(ActionListener listener) { final Closeable retentionLock = shard.acquireHistoryRetentionLock(); resources.add(retentionLock); final long startingSeqNo; - final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO + final boolean isSequenceNumberBasedRecovery = + request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()) && ((retentionLeaseRef.get() == null && shard.useRetentionLeasesInPeerRecovery() == false) @@ -559,11 +561,13 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) { cancellableThreads.checkForCancel(); final boolean canUseSnapshots = useSnapshots && request.canDownloadSnapshotFiles(); + Store.MetadataSnapshot targetMetadata = isSearchableSnapshotStore(shard.indexSettings().getSettings()) ? + recoverySourceMetadata : request.metadataSnapshot(); recoveryPlannerService.computeRecoveryPlan( shard.shardId(), shardStateIdentifier, recoverySourceMetadata, - request.metadataSnapshot(), + targetMetadata, startingSeqNo, translogOps.getAsInt(), getRequest().targetNode().getVersion(), diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 91fe52167627d..b66669214f44e 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -50,6 +50,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.isSearchableSnapshotStore; + /** * Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of * this class are created through {@link RecoveriesCollection}. @@ -486,7 +488,11 @@ public void cleanFiles( final Store store = store(); store.incRef(); try { - store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetadata); + if (isSearchableSnapshotStore(indexShard.indexSettings().getSettings())) { + indexShard.getIndexEventListener().afterFilesRestoredFromRepository(indexShard); + } else { + store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetadata); + } final String translogUUID = Translog.createEmptyTranslog( indexShard.shardPath().resolveTranslog(), globalCheckpoint, diff --git a/x-pack/plugin/old-lucene-versions/src/main/java/org/elasticsearch/xpack/lucene/bwc/codecs/BWCCodec.java b/x-pack/plugin/old-lucene-versions/src/main/java/org/elasticsearch/xpack/lucene/bwc/codecs/BWCCodec.java index 7d25366b20114..ee31589514736 100644 --- a/x-pack/plugin/old-lucene-versions/src/main/java/org/elasticsearch/xpack/lucene/bwc/codecs/BWCCodec.java +++ b/x-pack/plugin/old-lucene-versions/src/main/java/org/elasticsearch/xpack/lucene/bwc/codecs/BWCCodec.java @@ -31,6 +31,7 @@ import org.apache.lucene.index.Terms; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.xpack.lucene.bwc.codecs.lucene70.BWCLucene70Codec; import java.io.IOException; @@ -169,6 +170,10 @@ public void write(Directory directory, SegmentInfo segmentInfo, String segmentSu private static FieldInfos filterFields(FieldInfos fieldInfos) { List fieldInfoCopy = new ArrayList<>(fieldInfos.size()); for (FieldInfo fieldInfo : fieldInfos) { + // omit sequence number field so that it doesn't interfere with peer recovery + if (fieldInfo.name.equals(SeqNoFieldMapper.NAME)) { + continue; + } fieldInfoCopy.add( new FieldInfo( fieldInfo.name, diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/SearchableSnapshotIndexEventListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/SearchableSnapshotIndexEventListener.java index aaa90997eb6b4..9186d2e5d3e5a 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/SearchableSnapshotIndexEventListener.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/SearchableSnapshotIndexEventListener.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.index.SegmentInfos; import org.elasticsearch.action.StepListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RecoverySource; @@ -18,20 +17,15 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.index.translog.TranslogException; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.searchablesnapshots.cache.full.CacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.shared.FrozenCacheService; import org.elasticsearch.xpack.searchablesnapshots.store.SearchableSnapshotDirectory; -import java.nio.file.Path; - import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.isSearchableSnapshotStore; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; @@ -65,7 +59,6 @@ public SearchableSnapshotIndexEventListener( public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) { assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC); ensureSnapshotIsLoaded(indexShard); - associateNewEmptyTranslogWithIndex(indexShard); } private static void ensureSnapshotIsLoaded(IndexShard indexShard) { @@ -93,26 +86,6 @@ private static void ensureSnapshotIsLoaded(IndexShard indexShard) { : "loading snapshot must not be called twice unless we are retrying a peer recovery"; } - private static void associateNewEmptyTranslogWithIndex(IndexShard indexShard) { - final ShardId shardId = indexShard.shardId(); - assert isSearchableSnapshotStore(indexShard.indexSettings().getSettings()) : "Expected a searchable snapshot shard " + shardId; - if (indexShard.routingEntry().primary() - && indexShard.routingEntry().recoverySource().getType().equals(RecoverySource.Type.SNAPSHOT)) { - // translog initialization is done later in the restore step - return; - } - try { - final SegmentInfos segmentInfos = indexShard.store().readLastCommittedSegmentsInfo(); - final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); - final long primaryTerm = indexShard.getPendingPrimaryTerm(); - final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY); - final Path translogLocation = indexShard.shardPath().resolveTranslog(); - Translog.createEmptyTranslog(translogLocation, shardId, localCheckpoint, primaryTerm, translogUUID, null); - } catch (Exception e) { - throw new TranslogException(shardId, "failed to associate a new translog", e); - } - } - @Override public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason reason) { if (shouldEvictCacheFiles(reason)) { diff --git a/x-pack/qa/repository-old-versions/build.gradle b/x-pack/qa/repository-old-versions/build.gradle index fce544839e21a..725d05ca60eae 100644 --- a/x-pack/qa/repository-old-versions/build.gradle +++ b/x-pack/qa/repository-old-versions/build.gradle @@ -71,6 +71,7 @@ if (Os.isFamily(Os.FAMILY_WINDOWS)) { def testClusterProvider = testClusters.register(clusterName) { testDistribution = 'DEFAULT' + numberOfNodes = 2 setting 'path.repo', repoLocation setting 'xpack.license.self_generated.type', 'trial' diff --git a/x-pack/qa/repository-old-versions/src/test/java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java b/x-pack/qa/repository-old-versions/src/test/java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java index 5e6b44b0243df..bed58999658e1 100644 --- a/x-pack/qa/repository-old-versions/src/test/java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java +++ b/x-pack/qa/repository-old-versions/src/test/java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java @@ -10,6 +10,7 @@ import org.apache.http.HttpHost; import org.elasticsearch.Build; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; @@ -26,6 +27,8 @@ import org.elasticsearch.client.indices.CloseIndexRequest; import org.elasticsearch.client.searchable_snapshots.MountSnapshotRequest; import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -211,6 +214,16 @@ private void restoreMountAndVerify(int numDocs, Set expectedIds, RestHig assertEquals(numberOfShards, restoreSnapshotResponse.getRestoreInfo().totalShards()); assertEquals(numberOfShards, restoreSnapshotResponse.getRestoreInfo().successfulShards()); + assertEquals( + ClusterHealthStatus.GREEN, + client.cluster() + .health( + new ClusterHealthRequest("restored_test").waitForGreenStatus().waitForNoRelocatingShards(true), + RequestOptions.DEFAULT + ) + .getStatus() + ); + // run a search against the index assertDocs("restored_test", numDocs, expectedIds, client); @@ -219,6 +232,7 @@ private void restoreMountAndVerify(int numDocs, Set expectedIds, RestHig .mountSnapshot( new MountSnapshotRequest("testrepo", "snap1", "test").storage(MountSnapshotRequest.Storage.FULL_COPY) .renamedIndex("mounted_full_copy_test") + .indexSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()) .waitForCompletion(true), RequestOptions.DEFAULT ); @@ -226,6 +240,16 @@ private void restoreMountAndVerify(int numDocs, Set expectedIds, RestHig assertEquals(numberOfShards, mountSnapshotResponse.getRestoreInfo().totalShards()); assertEquals(numberOfShards, mountSnapshotResponse.getRestoreInfo().successfulShards()); + assertEquals( + ClusterHealthStatus.GREEN, + client.cluster() + .health( + new ClusterHealthRequest("mounted_full_copy_test").waitForGreenStatus().waitForNoRelocatingShards(true), + RequestOptions.DEFAULT + ) + .getStatus() + ); + // run a search against the index assertDocs("mounted_full_copy_test", numDocs, expectedIds, client); From 3383a62554ab71ebd035b5044fb8d8e70df526cd Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 8 Dec 2021 14:40:56 +0100 Subject: [PATCH 2/8] spoteless --- .../indices/recovery/RecoverySourceHandler.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 6144210ca9684..c8e8088bc67e5 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -211,8 +211,7 @@ public void recoverToTarget(ActionListener listener) { final Closeable retentionLock = shard.acquireHistoryRetentionLock(); resources.add(retentionLock); final long startingSeqNo; - final boolean isSequenceNumberBasedRecovery = - request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO + final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()) && ((retentionLeaseRef.get() == null && shard.useRetentionLeasesInPeerRecovery() == false) @@ -561,8 +560,9 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) { cancellableThreads.checkForCancel(); final boolean canUseSnapshots = useSnapshots && request.canDownloadSnapshotFiles(); - Store.MetadataSnapshot targetMetadata = isSearchableSnapshotStore(shard.indexSettings().getSettings()) ? - recoverySourceMetadata : request.metadataSnapshot(); + Store.MetadataSnapshot targetMetadata = isSearchableSnapshotStore(shard.indexSettings().getSettings()) + ? recoverySourceMetadata + : request.metadataSnapshot(); recoveryPlannerService.computeRecoveryPlan( shard.shardId(), shardStateIdentifier, From 8af1fdbe1afeeb642a39a7160355e002cd31b0bd Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 8 Dec 2021 15:55:19 +0100 Subject: [PATCH 3/8] fix test --- .../indices/recovery/RecoverySourceHandlerTests.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 39efba2244aab..a81a379cf42a1 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -252,6 +252,7 @@ public void testSendSnapshotSendsOps() throws IOException { final StartRecoveryRequest request = getStartRecoveryRequest(); final IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); + when(shard.indexSettings()).thenReturn(INDEX_SETTINGS); final List operations = new ArrayList<>(); final int initialNumberOfDocs = randomIntBetween(10, 1000); for (int i = 0; i < initialNumberOfDocs; i++) { @@ -327,6 +328,7 @@ public void testSendSnapshotStopOnError() throws Exception { final StartRecoveryRequest request = getStartRecoveryRequest(); final IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); + when(shard.indexSettings()).thenReturn(INDEX_SETTINGS); final List ops = new ArrayList<>(); for (int numOps = between(1, 256), i = 0; i < numOps; i++) { final Engine.Index index = getIndex(Integer.toString(i)); @@ -387,6 +389,7 @@ public void indexTranslogOperations( public void testSendOperationsConcurrently() throws Throwable { final IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); + when(shard.indexSettings()).thenReturn(INDEX_SETTINGS); Set receivedSeqNos = ConcurrentCollections.newConcurrentSet(); long maxSeenAutoIdTimestamp = randomBoolean() ? -1 : randomNonNegativeLong(); long maxSeqNoOfUpdatesOrDeletes = randomBoolean() ? -1 : randomNonNegativeLong(); @@ -749,6 +752,7 @@ public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception { final IndexShard shard = mock(IndexShard.class); final AtomicBoolean freed = new AtomicBoolean(true); when(shard.isRelocatedPrimary()).thenReturn(false); + when(shard.indexSettings()).thenReturn(INDEX_SETTINGS); doAnswer(invocation -> { freed.set(false); ((ActionListener) invocation.getArguments()[0]).onResponse(() -> freed.set(true)); @@ -919,6 +923,7 @@ public void testCancelRecoveryDuringPhase1() throws Exception { Store store = newStore(createTempDir("source"), false); IndexShard shard = mock(IndexShard.class); when(shard.store()).thenReturn(store); + when(shard.indexSettings()).thenReturn(INDEX_SETTINGS); Directory dir = store.directory(); RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig()); int numDocs = randomIntBetween(10, 100); @@ -1067,6 +1072,7 @@ public void testRecoveryPlannerServiceIsUsed() throws Exception { try (Store store = newStore(createTempDir("source"), false)) { IndexShard shard = mock(IndexShard.class); when(shard.store()).thenReturn(store); + when(shard.indexSettings()).thenReturn(INDEX_SETTINGS); Directory dir = store.directory(); RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig()); int numDocs = randomIntBetween(10, 100); From 5a164e7de2e8c29d9a1ca32def828b00da7fabcc Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 13 Dec 2021 10:38:27 +0100 Subject: [PATCH 4/8] comment + canUseSnapshots = false --- .../recovery/RecoverySourceHandler.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index c8e8088bc67e5..d8496b9a3f1dd 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -559,10 +559,20 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A } if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) { cancellableThreads.checkForCancel(); - final boolean canUseSnapshots = useSnapshots && request.canDownloadSnapshotFiles(); - Store.MetadataSnapshot targetMetadata = isSearchableSnapshotStore(shard.indexSettings().getSettings()) - ? recoverySourceMetadata - : request.metadataSnapshot(); + final Store.MetadataSnapshot targetMetadata; + final boolean canUseSnapshots; + if (isSearchableSnapshotStore(shard.indexSettings().getSettings())) { + // When using searchable snapshots, skip the file copy process as the target node already knows how to retrieve those + // files; skipping the file copy process is done by setting sourceMetadata == targetMetadata. + // We can't fully skip phase 1, as we still want to run the cleanFiles step to prepare the files on the target node + targetMetadata = recoverySourceMetadata; + // Set canUseSnapshots to false to avoid scanning the snapshot repository for available snapshots as + // we already know that there is a snapshot available and the target node knows how to retrieve the files. + canUseSnapshots = false; + } else { + targetMetadata = request.metadataSnapshot(); + canUseSnapshots = useSnapshots && request.canDownloadSnapshotFiles(); + } recoveryPlannerService.computeRecoveryPlan( shard.shardId(), shardStateIdentifier, From 46373dde4dd52a6f33c229f7b0db2adb56ff336e Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 13 Dec 2021 17:48:54 +0100 Subject: [PATCH 5/8] different approach --- .../index/shard/StoreRecovery.java | 12 ++- .../recovery/PeerRecoveryTargetService.java | 90 ++++++++++--------- .../recovery/RecoverySourceHandler.java | 18 +--- .../indices/recovery/RecoveryTarget.java | 8 +- 4 files changed, 59 insertions(+), 69 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 22a9d40cab600..7604cfcfcdd43 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -53,12 +53,13 @@ import static org.elasticsearch.common.lucene.Lucene.indexWriterConfigWithNoMerging; import static org.elasticsearch.core.TimeValue.timeValueMillis; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.isSearchableSnapshotStore; /** * This package private utility class encapsulates the logic to recover an index shard from either an existing index on * disk or from a snapshot in a repository. */ -final class StoreRecovery { +public final class StoreRecovery { private final Logger logger; private final ShardId shardId; @@ -549,14 +550,17 @@ private void restore( } } - private void bootstrap(final IndexShard indexShard, final Store store) throws IOException { - store.bootstrapNewHistory(); + public static void bootstrap(final IndexShard indexShard, final Store store) throws IOException { + if (isSearchableSnapshotStore(indexShard.indexSettings().getSettings()) == false) { + // not bootstrapping new history for searchable snapshots (which are read-only) allows sequence-number based peer recoveries + store.bootstrapNewHistory(); + } final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); final String translogUUID = Translog.createEmptyTranslog( indexShard.shardPath().resolveTranslog(), localCheckpoint, - shardId, + indexShard.shardId(), indexShard.getPendingPrimaryTerm() ); store.associateIndexWithNewTranslog(translogUUID); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 67ed724e4d215..9403661e1ce1f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -44,6 +44,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.index.shard.StoreRecovery; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogCorruptedException; @@ -227,6 +228,17 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); + if (isSearchableSnapshotStore(indexShard.indexSettings().getSettings())) { + // for searchable snapshots, peer recovery is treated similarly to recovery from snapshot + indexShard.getIndexEventListener().afterFilesRestoredFromRepository(indexShard); + final Store store = indexShard.store(); + store.incRef(); + try { + StoreRecovery.bootstrap(indexShard, store); + } finally { + store.decRef(); + } + } final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint(); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; @@ -284,51 +296,45 @@ public static StartRecoveryRequest getStartRecoveryRequest( logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); Store.MetadataSnapshot metadataSnapshot; - - if (isSearchableSnapshotStore(recoveryTarget.indexShard().indexSettings().getSettings())) { - metadataSnapshot = Store.MetadataSnapshot.EMPTY; - startingSeqNo = UNASSIGNED_SEQ_NO; - } else { + try { + metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); + // Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index. try { - metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); - // Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the index. - try { - final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY); - final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID); - assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint; - } catch (IOException | TranslogCorruptedException e) { - logger.warn( - new ParameterizedMessage( - "error while reading global checkpoint from translog, " - + "resetting the starting sequence number from {} to unassigned and recovering as if there are none", - startingSeqNo - ), - e - ); - metadataSnapshot = Store.MetadataSnapshot.EMPTY; - startingSeqNo = UNASSIGNED_SEQ_NO; - } - } catch (final org.apache.lucene.index.IndexNotFoundException e) { - // happens on an empty folder. no need to log - assert startingSeqNo == UNASSIGNED_SEQ_NO : startingSeqNo; - logger.trace("{} shard folder empty, recovering all files", recoveryTarget); - metadataSnapshot = Store.MetadataSnapshot.EMPTY; - } catch (final IOException e) { - if (startingSeqNo != UNASSIGNED_SEQ_NO) { - logger.warn( - new ParameterizedMessage( - "error while listing local files, resetting the starting sequence number from {} " - + "to unassigned and recovering as if there are none", - startingSeqNo - ), - e - ); - startingSeqNo = UNASSIGNED_SEQ_NO; - } else { - logger.warn("error while listing local files, recovering as if there are none", e); - } + final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID); + assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint; + } catch (IOException | TranslogCorruptedException e) { + logger.warn( + new ParameterizedMessage( + "error while reading global checkpoint from translog, " + + "resetting the starting sequence number from {} to unassigned and recovering as if there are none", + startingSeqNo + ), + e + ); metadataSnapshot = Store.MetadataSnapshot.EMPTY; + startingSeqNo = UNASSIGNED_SEQ_NO; } + } catch (final org.apache.lucene.index.IndexNotFoundException e) { + // happens on an empty folder. no need to log + assert startingSeqNo == UNASSIGNED_SEQ_NO : startingSeqNo; + logger.trace("{} shard folder empty, recovering all files", recoveryTarget); + metadataSnapshot = Store.MetadataSnapshot.EMPTY; + } catch (final IOException e) { + if (startingSeqNo != UNASSIGNED_SEQ_NO) { + logger.warn( + new ParameterizedMessage( + "error while listing local files, resetting the starting sequence number from {} " + + "to unassigned and recovering as if there are none", + startingSeqNo + ), + e + ); + startingSeqNo = UNASSIGNED_SEQ_NO; + } else { + logger.warn("error while listing local files, recovering as if there are none", e); + } + metadataSnapshot = Store.MetadataSnapshot.EMPTY; } logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size()); request = new StartRecoveryRequest( diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index d8496b9a3f1dd..5c1245788360e 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -93,7 +93,6 @@ import java.util.stream.StreamSupport; import static org.elasticsearch.common.util.CollectionUtils.concatLists; -import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.isSearchableSnapshotStore; /** * RecoverySourceHandler handles the three phases of shard recovery, which is @@ -559,25 +558,12 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A } if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) { cancellableThreads.checkForCancel(); - final Store.MetadataSnapshot targetMetadata; - final boolean canUseSnapshots; - if (isSearchableSnapshotStore(shard.indexSettings().getSettings())) { - // When using searchable snapshots, skip the file copy process as the target node already knows how to retrieve those - // files; skipping the file copy process is done by setting sourceMetadata == targetMetadata. - // We can't fully skip phase 1, as we still want to run the cleanFiles step to prepare the files on the target node - targetMetadata = recoverySourceMetadata; - // Set canUseSnapshots to false to avoid scanning the snapshot repository for available snapshots as - // we already know that there is a snapshot available and the target node knows how to retrieve the files. - canUseSnapshots = false; - } else { - targetMetadata = request.metadataSnapshot(); - canUseSnapshots = useSnapshots && request.canDownloadSnapshotFiles(); - } + final boolean canUseSnapshots = useSnapshots && request.canDownloadSnapshotFiles(); recoveryPlannerService.computeRecoveryPlan( shard.shardId(), shardStateIdentifier, recoverySourceMetadata, - targetMetadata, + request.metadataSnapshot(), startingSeqNo, translogOps.getAsInt(), getRequest().targetNode().getVersion(), diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index b66669214f44e..91fe52167627d 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -50,8 +50,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.isSearchableSnapshotStore; - /** * Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of * this class are created through {@link RecoveriesCollection}. @@ -488,11 +486,7 @@ public void cleanFiles( final Store store = store(); store.incRef(); try { - if (isSearchableSnapshotStore(indexShard.indexSettings().getSettings())) { - indexShard.getIndexEventListener().afterFilesRestoredFromRepository(indexShard); - } else { - store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetadata); - } + store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetadata); final String translogUUID = Translog.createEmptyTranslog( indexShard.shardPath().resolveTranslog(), globalCheckpoint, From 4a2cd8c97c83acef966a0fd8902fdf95beec489c Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 13 Dec 2021 17:54:24 +0100 Subject: [PATCH 6/8] revert changes no longer necessary --- .../indices/recovery/RecoverySourceHandlerTests.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index a81a379cf42a1..39efba2244aab 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -252,7 +252,6 @@ public void testSendSnapshotSendsOps() throws IOException { final StartRecoveryRequest request = getStartRecoveryRequest(); final IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); - when(shard.indexSettings()).thenReturn(INDEX_SETTINGS); final List operations = new ArrayList<>(); final int initialNumberOfDocs = randomIntBetween(10, 1000); for (int i = 0; i < initialNumberOfDocs; i++) { @@ -328,7 +327,6 @@ public void testSendSnapshotStopOnError() throws Exception { final StartRecoveryRequest request = getStartRecoveryRequest(); final IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); - when(shard.indexSettings()).thenReturn(INDEX_SETTINGS); final List ops = new ArrayList<>(); for (int numOps = between(1, 256), i = 0; i < numOps; i++) { final Engine.Index index = getIndex(Integer.toString(i)); @@ -389,7 +387,6 @@ public void indexTranslogOperations( public void testSendOperationsConcurrently() throws Throwable { final IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); - when(shard.indexSettings()).thenReturn(INDEX_SETTINGS); Set receivedSeqNos = ConcurrentCollections.newConcurrentSet(); long maxSeenAutoIdTimestamp = randomBoolean() ? -1 : randomNonNegativeLong(); long maxSeqNoOfUpdatesOrDeletes = randomBoolean() ? -1 : randomNonNegativeLong(); @@ -752,7 +749,6 @@ public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception { final IndexShard shard = mock(IndexShard.class); final AtomicBoolean freed = new AtomicBoolean(true); when(shard.isRelocatedPrimary()).thenReturn(false); - when(shard.indexSettings()).thenReturn(INDEX_SETTINGS); doAnswer(invocation -> { freed.set(false); ((ActionListener) invocation.getArguments()[0]).onResponse(() -> freed.set(true)); @@ -923,7 +919,6 @@ public void testCancelRecoveryDuringPhase1() throws Exception { Store store = newStore(createTempDir("source"), false); IndexShard shard = mock(IndexShard.class); when(shard.store()).thenReturn(store); - when(shard.indexSettings()).thenReturn(INDEX_SETTINGS); Directory dir = store.directory(); RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig()); int numDocs = randomIntBetween(10, 100); @@ -1072,7 +1067,6 @@ public void testRecoveryPlannerServiceIsUsed() throws Exception { try (Store store = newStore(createTempDir("source"), false)) { IndexShard shard = mock(IndexShard.class); when(shard.store()).thenReturn(store); - when(shard.indexSettings()).thenReturn(INDEX_SETTINGS); Directory dir = store.directory(); RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig()); int numDocs = randomIntBetween(10, 100); From 2c232286d94c0769796722f02ed5835d11fe269d Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 13 Dec 2021 22:22:45 +0100 Subject: [PATCH 7/8] fix snapshot of searchable snapshot --- .../blobstore/BlobStoreRepository.java | 35 ++++++++----------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 5c7898716660d..c0879cc053bce 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -2674,30 +2674,23 @@ public void snapshotShard(SnapshotShardContext context) { long indexIncrementalSize = 0; long indexTotalFileSize = 0; final BlockingQueue filesToSnapshot = new LinkedBlockingQueue<>(); - // If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files - // in the commit with files already in the repository - if (filesFromSegmentInfos == null) { + + if (isSearchableSnapshotStore(store.indexSettings().getSettings())) { + indexCommitPointFiles = new ArrayList<>(); + } else if (filesFromSegmentInfos == null) { + // If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files + // in the commit with files already in the repository indexCommitPointFiles = new ArrayList<>(); final Collection fileNames; final Store.MetadataSnapshot metadataFromStore; - if (isSearchableSnapshotStore(store.indexSettings().getSettings())) { - fileNames = Collections.emptyList(); - metadataFromStore = Store.MetadataSnapshot.EMPTY; - } else { - try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) { - // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should - try { - logger.trace( - "[{}] [{}] Loading store metadata using index commit [{}]", - shardId, - snapshotId, - snapshotIndexCommit - ); - metadataFromStore = store.getMetadata(snapshotIndexCommit); - fileNames = snapshotIndexCommit.getFileNames(); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); - } + try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) { + // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should + try { + logger.trace("[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit); + metadataFromStore = store.getMetadata(snapshotIndexCommit); + fileNames = snapshotIndexCommit.getFileNames(); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); } } for (String fileName : fileNames) { From 396cadc184fd59c73924b56ef94b919083084ab4 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 14 Dec 2021 08:30:05 +0100 Subject: [PATCH 8/8] empty list --- .../repositories/blobstore/BlobStoreRepository.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index c0879cc053bce..ad3464cd8b197 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -2676,7 +2676,7 @@ public void snapshotShard(SnapshotShardContext context) { final BlockingQueue filesToSnapshot = new LinkedBlockingQueue<>(); if (isSearchableSnapshotStore(store.indexSettings().getSettings())) { - indexCommitPointFiles = new ArrayList<>(); + indexCommitPointFiles = Collections.emptyList(); } else if (filesFromSegmentInfos == null) { // If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files // in the commit with files already in the repository