From 406d4def86c752d664c0bf8b7f64d14e50f0ddd6 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 12 Mar 2020 09:44:52 +0100 Subject: [PATCH 1/4] Associate translog with Lucene index commit --- .../common/settings/IndexScopedSettings.java | 1 + .../elasticsearch/index/IndexSettings.java | 7 ++++ .../index/shard/StoreRecovery.java | 40 +++++++++++++++--- .../index/translog/Translog.java | 41 +++++++++++++++---- .../recovery/PeerRecoveryTargetService.java | 19 ++++++++- .../store/SearchableSnapshotDirectory.java | 30 +------------- ...ransportMountSearchableSnapshotAction.java | 2 + 7 files changed, 95 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index fceab21e8279b..762e0c7fe6a78 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -166,6 +166,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.FINAL_PIPELINE, MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING, IndexSettings.ON_HEAP_ID_TERMS_INDEX, + IndexSettings.SKIP_FILES_RECOVERY_SETTING, ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING, // validate that built-in similarities don't get redefined diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 83cbd0a2edd2a..432d77313ca85 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -325,6 +325,13 @@ public final class IndexSettings { public static final Setting FILE_BASED_RECOVERY_THRESHOLD_SETTING = Setting.doubleSetting("index.recovery.file_based_threshold", 0.1d, 0.0d, Setting.Property.IndexScope); + /** + * A badly named setting indicating that for some specific shards we skip the files recovery and assume that the + * files are available. + */ + public static final Setting SKIP_FILES_RECOVERY_SETTING = + Setting.boolSetting("index.recovery.skip_files", false, Setting.Property.IndexScope, Property.PrivateIndex); + private final Index index; private final Version version; private final Logger logger; diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index e6301371520a5..4a5f3e8cee2a6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.mapper.MapperService; @@ -398,7 +399,11 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe writeEmptyRetentionLeasesFile(indexShard); } else if (indexShouldExists) { if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) { - store.bootstrapNewHistory(); + if (IndexSettings.SKIP_FILES_RECOVERY_SETTING.get(indexShard.indexSettings().getSettings())) { + associateTranslogWithIndex(indexShard, store); + } else { + store.bootstrapNewHistory(); + } writeEmptyRetentionLeasesFile(indexShard); } // since we recover from local, just fill the files and size @@ -460,7 +465,14 @@ private void restore(IndexShard indexShard, Repository repository, SnapshotRecov final ActionListener restoreListener = ActionListener.wrap( v -> { final Store store = indexShard.store(); - bootstrap(indexShard, store); + if (IndexSettings.SKIP_FILES_RECOVERY_SETTING.get(indexShard.indexSettings().getSettings())) { + // shard is restored from a snapshot and we expect the store to already contains the files, + // hence we can skip bootstraping a new history uuid with a new translog, and simply + // associate an empty translog with the existing lucene commit. + associateTranslogWithIndex(indexShard, store); + } else { + bootstrap(indexShard, store); + } assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; writeEmptyRetentionLeasesFile(indexShard); indexShard.openEngineAndRecoverFromTranslog(); @@ -491,9 +503,18 @@ private void restore(IndexShard indexShard, Repository repository, SnapshotRecov indexIdListener.onResponse(indexId); } assert indexShard.getEngineOrNull() == null; - indexIdListener.whenComplete(idx -> repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), - idx, snapshotShardId, indexShard.recoveryState(), restoreListener), restoreListener::onFailure); - } catch (Exception e) { + indexIdListener.whenComplete( + idx -> { + if (IndexSettings.SKIP_FILES_RECOVERY_SETTING.get(indexShard.indexSettings().getSettings())) { + logger.debug("[{}] skipping full restore from [{}] [{}]", + shardId, restoreSource.snapshot().getRepository(), restoreSource.snapshot().getSnapshotId()); + restoreListener.onResponse(null); + } else { + repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), + idx, snapshotShardId, indexShard.recoveryState(), restoreListener); + } + }, restoreListener::onFailure); + } catch (Exception e) { restoreListener.onFailure(e); } } @@ -506,4 +527,13 @@ private void bootstrap(final IndexShard indexShard, final Store store) throws IO indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); } + + private void associateTranslogWithIndex(final IndexShard indexShard, final Store store) throws IOException { + assert IndexSettings.SKIP_FILES_RECOVERY_SETTING.get(indexShard.indexSettings().getSettings()); + final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); + final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + final long primaryTerm = indexShard.getPendingPrimaryTerm(); + final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY); + Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), shardId, localCheckpoint, primaryTerm, translogUUID, null); + } } diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index f0e2b04673a71..bc4d9332c101d 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -23,6 +23,8 @@ import org.apache.lucene.index.Term; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -1838,20 +1840,41 @@ public static String createEmptyTranslog(final Path location, final long initial static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, ShardId shardId, ChannelFactory channelFactory, long primaryTerm) throws IOException { + return createEmptyTranslog(location, shardId, initialGlobalCheckpoint, primaryTerm, null, channelFactory); + } + + public static String createEmptyTranslog(final Path location, + final ShardId shardId, + final long initialGlobalCheckpoint, + final long primaryTerm, + @Nullable final String translogUUID, + @Nullable final ChannelFactory factory) throws IOException { IOUtils.rm(location); Files.createDirectories(location); - final Checkpoint checkpoint = - Checkpoint.emptyTranslogCheckpoint(0, 1, initialGlobalCheckpoint, 1); + + final long generation = 1L; + final long minTranslogGeneration = 1L; + final ChannelFactory channelFactory = factory != null ? factory : FileChannel::open; + final String uuid = Strings.hasLength(translogUUID) ? translogUUID : UUIDs.randomBase64UUID(); final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME); + final Path translogFile = location.resolve(getFilename(generation)); + final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, initialGlobalCheckpoint, minTranslogGeneration); + Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); IOUtils.fsync(checkpointFile, false); - final String translogUUID = UUIDs.randomBase64UUID(); - TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1, - location.resolve(getFilename(1)), channelFactory, - new ByteSizeValue(10), 1, initialGlobalCheckpoint, - () -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm, - new TragicExceptionHolder(), seqNo -> { throw new UnsupportedOperationException(); }); + final TranslogWriter writer = TranslogWriter.create(shardId, uuid, generation, translogFile, channelFactory, + new ByteSizeValue(10), minTranslogGeneration, initialGlobalCheckpoint, + () -> { + throw new UnsupportedOperationException(); + }, () -> { + throw new UnsupportedOperationException(); + }, + primaryTerm, + new TragicExceptionHolder(), + seqNo -> { + throw new UnsupportedOperationException(); + }); writer.close(); - return translogUUID; + return uuid; } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 08ec24573c25a..a7061ff3ab4a5 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; @@ -42,8 +43,10 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.mapper.MapperException; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; @@ -64,6 +67,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.nio.file.Path; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -174,8 +178,19 @@ private void doRecovery(final long recoveryId) { try { assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); - recoveryTarget.indexShard().prepareForIndexRecovery(); - final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint(); + final IndexShard indexShard = recoveryTarget.indexShard(); + indexShard.prepareForIndexRecovery(); + if (IndexSettings.SKIP_FILES_RECOVERY_SETTING.get(indexShard.indexSettings().getSettings())) { + // associate a new empty translog with the last lucene commit, this way the next StartRecoveryRequest + // will see shard files as if they were already on disk + final SegmentInfos segmentInfos = indexShard.store().readLastCommittedSegmentsInfo(); + final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + final long primaryTerm = indexShard.getPendingPrimaryTerm(); + final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY); + final Path location = indexShard.shardPath().resolveTranslog(); + Translog.createEmptyTranslog(location, indexShard.shardId(), localCheckpoint, primaryTerm, translogUUID, null); + } + final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint(); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; request = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index 591417f32c833..db7f5597da197 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -5,9 +5,6 @@ */ package org.elasticsearch.index.store; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.store.BaseDirectory; import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.Directory; @@ -16,13 +13,10 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.SingleInstanceLockFactory; import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; -import org.elasticsearch.index.translog.Translog; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -35,8 +29,6 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.function.LongSupplier; @@ -172,26 +164,6 @@ public static Directory create(RepositoriesService repositories, directory = new CacheDirectory(directory, cache, cacheDir, snapshotId, indexId, shardPath.getShardId(), currentTimeNanosSupplier); } - directory = new InMemoryNoOpCommitDirectory(directory); - - final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(null) - .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) - .setMergePolicy(NoMergePolicy.INSTANCE); - - try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { - final Map userData = new HashMap<>(); - indexWriter.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue())); - - final String translogUUID = Translog.createEmptyTranslog(shardPath.resolveTranslog(), - Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), - shardPath.getShardId(), 0L); - - userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); - indexWriter.setLiveCommitData(userData.entrySet()); - indexWriter.commit(); - } - - return directory; + return new InMemoryNoOpCommitDirectory(directory); } - } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java index f380436ae585d..92da5283c000f 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -95,6 +96,7 @@ private static Settings buildIndexSettings(String repoName, SnapshotId snapshotI .put(SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING.getKey(), snapshotId.getUUID()) .put(SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING.getKey(), indexId.getId()) .put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshots.SNAPSHOT_DIRECTORY_FACTORY_KEY) + .put(IndexSettings.SKIP_FILES_RECOVERY_SETTING.getKey(), true) .put(IndexMetaData.SETTING_BLOCKS_WRITE, true) .put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.getKey(), SearchableSnapshotAllocator.ALLOCATOR_NAME) .build(); From 32df1c4e79ae27eb47fbca05a8d8fb43b0fe04ca Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 12 Mar 2020 14:29:57 +0100 Subject: [PATCH 2/4] Partial revert "Associate translog with Lucene index commit" This partially reverts commit 406d4def --- .../common/settings/IndexScopedSettings.java | 1 - .../elasticsearch/index/IndexSettings.java | 7 ---- .../index/shard/StoreRecovery.java | 40 +++---------------- .../recovery/PeerRecoveryTargetService.java | 19 +-------- ...ransportMountSearchableSnapshotAction.java | 2 - 5 files changed, 7 insertions(+), 62 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 762e0c7fe6a78..fceab21e8279b 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -166,7 +166,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.FINAL_PIPELINE, MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING, IndexSettings.ON_HEAP_ID_TERMS_INDEX, - IndexSettings.SKIP_FILES_RECOVERY_SETTING, ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING, // validate that built-in similarities don't get redefined diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 432d77313ca85..83cbd0a2edd2a 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -325,13 +325,6 @@ public final class IndexSettings { public static final Setting FILE_BASED_RECOVERY_THRESHOLD_SETTING = Setting.doubleSetting("index.recovery.file_based_threshold", 0.1d, 0.0d, Setting.Property.IndexScope); - /** - * A badly named setting indicating that for some specific shards we skip the files recovery and assume that the - * files are available. - */ - public static final Setting SKIP_FILES_RECOVERY_SETTING = - Setting.boolSetting("index.recovery.skip_files", false, Setting.Property.IndexScope, Property.PrivateIndex); - private final Index index; private final Version version; private final Logger logger; diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 4a5f3e8cee2a6..e6301371520a5 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -41,7 +41,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.mapper.MapperService; @@ -399,11 +398,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe writeEmptyRetentionLeasesFile(indexShard); } else if (indexShouldExists) { if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) { - if (IndexSettings.SKIP_FILES_RECOVERY_SETTING.get(indexShard.indexSettings().getSettings())) { - associateTranslogWithIndex(indexShard, store); - } else { - store.bootstrapNewHistory(); - } + store.bootstrapNewHistory(); writeEmptyRetentionLeasesFile(indexShard); } // since we recover from local, just fill the files and size @@ -465,14 +460,7 @@ private void restore(IndexShard indexShard, Repository repository, SnapshotRecov final ActionListener restoreListener = ActionListener.wrap( v -> { final Store store = indexShard.store(); - if (IndexSettings.SKIP_FILES_RECOVERY_SETTING.get(indexShard.indexSettings().getSettings())) { - // shard is restored from a snapshot and we expect the store to already contains the files, - // hence we can skip bootstraping a new history uuid with a new translog, and simply - // associate an empty translog with the existing lucene commit. - associateTranslogWithIndex(indexShard, store); - } else { - bootstrap(indexShard, store); - } + bootstrap(indexShard, store); assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; writeEmptyRetentionLeasesFile(indexShard); indexShard.openEngineAndRecoverFromTranslog(); @@ -503,18 +491,9 @@ private void restore(IndexShard indexShard, Repository repository, SnapshotRecov indexIdListener.onResponse(indexId); } assert indexShard.getEngineOrNull() == null; - indexIdListener.whenComplete( - idx -> { - if (IndexSettings.SKIP_FILES_RECOVERY_SETTING.get(indexShard.indexSettings().getSettings())) { - logger.debug("[{}] skipping full restore from [{}] [{}]", - shardId, restoreSource.snapshot().getRepository(), restoreSource.snapshot().getSnapshotId()); - restoreListener.onResponse(null); - } else { - repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), - idx, snapshotShardId, indexShard.recoveryState(), restoreListener); - } - }, restoreListener::onFailure); - } catch (Exception e) { + indexIdListener.whenComplete(idx -> repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), + idx, snapshotShardId, indexShard.recoveryState(), restoreListener), restoreListener::onFailure); + } catch (Exception e) { restoreListener.onFailure(e); } } @@ -527,13 +506,4 @@ private void bootstrap(final IndexShard indexShard, final Store store) throws IO indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); } - - private void associateTranslogWithIndex(final IndexShard indexShard, final Store store) throws IOException { - assert IndexSettings.SKIP_FILES_RECOVERY_SETTING.get(indexShard.indexSettings().getSettings()); - final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); - final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); - final long primaryTerm = indexShard.getPendingPrimaryTerm(); - final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY); - Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), shardId, localCheckpoint, primaryTerm, translogUUID, null); - } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index a7061ff3ab4a5..08ec24573c25a 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -22,7 +22,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; @@ -43,10 +42,8 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.mapper.MapperException; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; @@ -67,7 +64,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.nio.file.Path; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -178,19 +174,8 @@ private void doRecovery(final long recoveryId) { try { assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); - final IndexShard indexShard = recoveryTarget.indexShard(); - indexShard.prepareForIndexRecovery(); - if (IndexSettings.SKIP_FILES_RECOVERY_SETTING.get(indexShard.indexSettings().getSettings())) { - // associate a new empty translog with the last lucene commit, this way the next StartRecoveryRequest - // will see shard files as if they were already on disk - final SegmentInfos segmentInfos = indexShard.store().readLastCommittedSegmentsInfo(); - final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); - final long primaryTerm = indexShard.getPendingPrimaryTerm(); - final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY); - final Path location = indexShard.shardPath().resolveTranslog(); - Translog.createEmptyTranslog(location, indexShard.shardId(), localCheckpoint, primaryTerm, translogUUID, null); - } - final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint(); + recoveryTarget.indexShard().prepareForIndexRecovery(); + final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint(); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; request = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java index 92da5283c000f..f380436ae585d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -96,7 +95,6 @@ private static Settings buildIndexSettings(String repoName, SnapshotId snapshotI .put(SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING.getKey(), snapshotId.getUUID()) .put(SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING.getKey(), indexId.getId()) .put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshots.SNAPSHOT_DIRECTORY_FACTORY_KEY) - .put(IndexSettings.SKIP_FILES_RECOVERY_SETTING.getKey(), true) .put(IndexMetaData.SETTING_BLOCKS_WRITE, true) .put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.getKey(), SearchableSnapshotAllocator.ALLOCATOR_NAME) .build(); From 4ab94ce30de70f4b95a6daa67538bb07ab926129 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 12 Mar 2020 16:57:08 +0100 Subject: [PATCH 3/4] Add pre recovery hook --- .../index/CompositeIndexEventListener.java | 13 ++++++ .../index/shard/IndexEventListener.java | 11 +++++ .../elasticsearch/index/shard/IndexShard.java | 7 +++ .../index/shard/StoreRecovery.java | 2 + .../recovery/PeerRecoveryTargetService.java | 6 ++- .../SearchableSnapshotIndexEventListener.java | 44 +++++++++++++++++++ .../SearchableSnapshots.java | 16 ++++++- 7 files changed, 95 insertions(+), 4 deletions(-) create mode 100644 x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java diff --git a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java index 03f15d7a85d8e..334ab4a944e3a 100644 --- a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java @@ -257,4 +257,17 @@ public void onStoreClosed(ShardId shardId) { } } } + + @Override + public void beforeIndexShardRecovery(final IndexShard indexShard, final IndexSettings indexSettings) { + for (IndexEventListener listener : listeners) { + try { + listener.beforeIndexShardRecovery(indexShard, indexSettings); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("failed to invoke the listener before the shard recovery starts for {}", + indexShard.shardId()), e); + throw e; + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java b/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java index 9cbd9ce4df253..1e664d656b9d0 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java @@ -167,4 +167,15 @@ default void onStoreCreated(ShardId shardId) {} * @param shardId the shard ID the store belongs to */ default void onStoreClosed(ShardId shardId) {} + + /** + * Called before the index shard starts to recover. + * Note: unlike all other methods in this class, this method is not called using the cluster state update thread. When this method is + * called the shard already transitioned to the RECOVERING state. + * + * @param indexShard the shard that is about to recover + * @param indexSettings the shard's index settings + */ + default void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) { + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 56707bc198beb..3cd44fd9686d3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1313,6 +1313,13 @@ public void close(String reason, boolean flushEngine) throws IOException { } } + public void preRecovery() { + if (state != IndexShardState.RECOVERING) { + throw new IndexShardNotRecoveringException(shardId, state); + } + indexEventListener.beforeIndexShardRecovery(this, indexSettings); + } + public void postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { synchronized (postRecoveryMutex) { // we need to refresh again to expose all operations that were index until now. Otherwise diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index e6301371520a5..e98859975fbae 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -359,6 +359,7 @@ private ActionListener recoveryListener(IndexShard indexShard, ActionLi * Recovers the state of the shard from the store. */ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRecoveryException { + indexShard.preRecovery(); final RecoveryState recoveryState = indexShard.recoveryState(); final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE; indexShard.prepareForIndexRecovery(); @@ -449,6 +450,7 @@ private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState private void restore(IndexShard indexShard, Repository repository, SnapshotRecoverySource restoreSource, ActionListener listener) { logger.debug("restoring from {} ...", indexShard.recoveryState().getRecoverySource()); + indexShard.preRecovery(); final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog(); if (restoreSource == null) { listener.onFailure(new IndexShardRestoreFailedException(shardId, "empty restore source")); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 08ec24573c25a..cb9e23f40bc7a 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -171,11 +171,13 @@ private void doRecovery(final long recoveryId) { final RecoveryTarget recoveryTarget = recoveryRef.target(); timer = recoveryTarget.state().getTimer(); cancellableThreads = recoveryTarget.cancellableThreads(); + final IndexShard indexShard = recoveryTarget.indexShard(); try { + indexShard.preRecovery(); assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); - recoveryTarget.indexShard().prepareForIndexRecovery(); - final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint(); + indexShard.prepareForIndexRecovery(); + final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint(); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; request = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java new file mode 100644 index 0000000000000..f692b8079b360 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots; + +import org.apache.lucene.index.SegmentInfos; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogException; +import org.elasticsearch.threadpool.ThreadPool; + +import java.nio.file.Path; + +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.isSearchableSnapshotStore; + +public class SearchableSnapshotIndexEventListener implements IndexEventListener { + + @Override + public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) { + assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC); + associateNewEmptyTranslogWithIndex(indexShard); + } + + private static void associateNewEmptyTranslogWithIndex(IndexShard indexShard) { + final ShardId shardId = indexShard.shardId(); + assert isSearchableSnapshotStore(indexShard.indexSettings().getSettings()) : "Expected a searchable snapshot shard " + shardId; + try { + final SegmentInfos segmentInfos = indexShard.store().readLastCommittedSegmentsInfo(); + final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + final long primaryTerm = indexShard.getPendingPrimaryTerm(); + final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY); + final Path translogLocation = indexShard.shardPath().resolveTranslog(); + Translog.createEmptyTranslog(translogLocation, shardId, localCheckpoint, primaryTerm, translogUUID, null); + } catch (Exception e) { + throw new TranslogException(shardId, "failed to associate a new translog", e); + } + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index d7aeedcee78b8..ea95fad971dc8 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.ReadOnlyEngine; @@ -40,8 +41,8 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; -import org.elasticsearch.xpack.searchablesnapshots.action.ClearSearchableSnapshotsCacheAction; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; +import org.elasticsearch.xpack.searchablesnapshots.action.ClearSearchableSnapshotsCacheAction; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction; import org.elasticsearch.xpack.searchablesnapshots.action.TransportClearSearchableSnapshotsCacheAction; import org.elasticsearch.xpack.searchablesnapshots.action.TransportMountSearchableSnapshotAction; @@ -125,6 +126,13 @@ public void onRepositoriesModule(RepositoriesModule repositoriesModule) { repositoriesService.set(repositoriesModule.getRepositoryService()); } + @Override + public void onIndexModule(IndexModule indexModule) { + if (isSearchableSnapshotStore(indexModule.getSettings())) { + indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener()); + } + } + @Override public Map getDirectoryFactories() { return Map.of(SNAPSHOT_DIRECTORY_FACTORY_KEY, (indexSettings, shardPath) -> { @@ -138,7 +146,7 @@ public Map getDirectoryFactories() { @Override public Optional getEngineFactory(IndexSettings indexSettings) { - if (SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings.getSettings())) + if (isSearchableSnapshotStore(indexSettings.getSettings()) && indexSettings.getSettings().getAsBoolean("index.frozen", false) == false) { return Optional.of(engineConfig -> new ReadOnlyEngine(engineConfig, null, new TranslogStats(), false, Function.identity())); } @@ -169,5 +177,9 @@ public List getRestHandlers(Settings settings, RestController restC public Map getExistingShardsAllocators() { return Collections.singletonMap(SearchableSnapshotAllocator.ALLOCATOR_NAME, new SearchableSnapshotAllocator()); } + + static boolean isSearchableSnapshotStore(Settings indexSettings) { + return SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings)); + } } From 557d924f02ea84cbd084734a132277706bfead62 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 13 Mar 2020 10:04:35 +0100 Subject: [PATCH 4/4] Feedback --- .../elasticsearch/index/shard/IndexShard.java | 4 +--- .../elasticsearch/index/translog/Translog.java | 17 +++++++++++++++++ .../recovery/PeerRecoveryTargetService.java | 2 +- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 3cd44fd9686d3..e70837f5eb62f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1314,9 +1314,7 @@ public void close(String reason, boolean flushEngine) throws IOException { } public void preRecovery() { - if (state != IndexShardState.RECOVERING) { - throw new IndexShardNotRecoveringException(shardId, state); - } + assert state == IndexShardState.RECOVERING : "expected a recovering shard " + shardId + " but got " + state; indexEventListener.beforeIndexShardRecovery(this, indexSettings); } diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index bc4d9332c101d..07cc7b1ffdb2c 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1843,6 +1843,23 @@ static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, S return createEmptyTranslog(location, shardId, initialGlobalCheckpoint, primaryTerm, null, channelFactory); } + /** + * Creates a new empty translog within the specified {@code location} that contains the given {@code initialGlobalCheckpoint}, + * {@code primaryTerm} and {@code translogUUID}. + * + * This method should be used directly under specific circumstances like for shards that will see no indexing. Specifying a non-unique + * translog UUID could cause a lot of issues and that's why in all (but one) cases the method + * {@link #createEmptyTranslog(Path, long, ShardId, long)} should be used instead. + * + * @param location a {@link Path} to the directory that will contains the translog files (translog + translog checkpoint) + * @param shardId the {@link ShardId} + * @param initialGlobalCheckpoint the global checkpoint to initialize the translog with + * @param primaryTerm the shard's primary term to initialize the translog with + * @param translogUUID the unique identifier to initialize the translog with + * @param factory a {@link ChannelFactory} used to open translog files + * @return the translog's unique identifier + * @throws IOException if something went wrong during translog creation + */ public static String createEmptyTranslog(final Path location, final ShardId shardId, final long initialGlobalCheckpoint, diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index cb9e23f40bc7a..44dce3e70e21c 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -171,8 +171,8 @@ private void doRecovery(final long recoveryId) { final RecoveryTarget recoveryTarget = recoveryRef.target(); timer = recoveryTarget.state().getTimer(); cancellableThreads = recoveryTarget.cancellableThreads(); - final IndexShard indexShard = recoveryTarget.indexShard(); try { + final IndexShard indexShard = recoveryTarget.indexShard(); indexShard.preRecovery(); assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());