Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,13 @@ public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnviron
try (ShardLock lock = shardLocker.lock(shardId, "open index", TimeUnit.SECONDS.toMillis(5));
Directory dir = new SimpleFSDirectory(indexLocation)) {
failIfCorrupted(dir);
SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
logger.trace("{} loaded segment info [{}]", shardId, segInfo);
// Previously we called Lucene#readSegmentInfos which verifies that some Lucene metadata is readable and makes sense, but if it
// weren't then we would mark this shard as corrupt when allocated, so it seems that this is unnecessary (and it breaks when
// the shard's directory is virtual since we use SimpleFSDirectory above.
// TODO NORELEASE is this ok? Need to check that we definitely add a corruption marker if the metadata is corrupt.
// SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
// logger.trace("{} loaded segment info [{}]", shardId, segInfo);
logger.trace("{} tryOpenIndex succeeded", shardId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public RepositoriesModule(Environment env, List<RepositoryPlugin> repoPlugins, T
Map<String, Repository.Factory> internalRepositoryTypes = Collections.unmodifiableMap(internalFactories);
repositoriesService = new RepositoriesService(settings, clusterService, transportService, repositoryTypes,
internalRepositoryTypes, threadPool);

repoPlugins.forEach(rp -> rp.onRepositoriesModule(this));
}

public RepositoriesService getRepositoryService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,7 @@ public void testCanOpenIndex() throws IOException {
IndexWriterConfig iwc = newIndexWriterConfig();
Path tempDir = createTempDir();
final BaseDirectoryWrapper dir = newFSDirectory(tempDir);
assertFalse(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l, d) -> new DummyShardLock(id)));
// assertFalse(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l, d) -> new DummyShardLock(id))); TODO NORELEASE
IndexWriter writer = new IndexWriter(dir, iwc);
Document doc = new Document();
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots;

import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.NoLockFactory;
import org.elasticsearch.core.internal.io.IOUtils;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.Set;

/**
* A {@link Directory} which wraps a read-only "real" directory with a wrapper that allows no-op (in-memory) commits, and peer recoveries
* of the same, so that we can start a shard on a completely readonly data set.
*/
public class InMemoryNoOpCommitDirectory extends FilterDirectory {
private final Directory realDirectory;

InMemoryNoOpCommitDirectory(Directory realDirectory) {
super(new ByteBuffersDirectory(NoLockFactory.INSTANCE));
this.realDirectory = realDirectory;
}

@Override
public String[] listAll() throws IOException {
final String[] ephemeralFiles = in.listAll();
final String[] realFiles = realDirectory.listAll();
final String[] allFiles = new String[ephemeralFiles.length + realFiles.length];
System.arraycopy(ephemeralFiles, 0, allFiles, 0, ephemeralFiles.length);
System.arraycopy(realFiles, 0, allFiles, ephemeralFiles.length, realFiles.length);
return allFiles;
}

@Override
public void deleteFile(String name) throws IOException {
ensureMutable(name);
try {
in.deleteFile(name);
} catch (NoSuchFileException | FileNotFoundException e) {
// cannot delete the segments_N file in the read-only directory, but that's ok, just ignore this
}
}

@Override
public long fileLength(String name) throws IOException {
try {
return in.fileLength(name);
} catch (NoSuchFileException | FileNotFoundException e) {
return realDirectory.fileLength(name);
}
}

@Override
public void sync(Collection<String> names) {
}

@Override
public void syncMetaData() {
}

@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
ensureMutable(name);
return super.createOutput(name, context);
}

@Override
public void rename(String source, String dest) throws IOException {
ensureMutable(source);
ensureMutable(dest);
super.rename(source, dest);
}

@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) {
throw new UnsupportedOperationException();
}

@Override
public void copyFrom(Directory from, String src, String dest, IOContext context) {
throw new UnsupportedOperationException();
}

@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
try {
return in.openInput(name, context);
} catch (NoSuchFileException | FileNotFoundException e) {
return realDirectory.openInput(name, context);
}
}

@Override
public void close() throws IOException {
IOUtils.close(in, realDirectory);
}

@Override
public Set<String> getPendingDeletions() throws IOException {
return super.getPendingDeletions(); // read-only realDirectory has no pending deletions
}

private static void ensureMutable(String name) {
if ((name.startsWith("segments_")
|| name.startsWith("pending_segments_")
|| name.matches("^recovery\\..*\\.segments_.*$")) == false) {

throw new IllegalArgumentException("file [" + name + "] is not mutable");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots;

import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.Directory;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.index.store.SearchableSnapshotDirectory;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.repositories.FilterRepository;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotId;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;

/**
* A repository that wraps a {@link BlobStoreRepository} to add settings to the index metadata during a restore to identify the source
* snapshot and index in order to create a {@link SearchableSnapshotDirectory} (and corresponding empty translog) to search these shards
* without needing to fully restore them.
*/
public class SearchableSnapshotRepository extends FilterRepository {

public static final String TYPE = "searchable";

public static final Setting<String> SNAPSHOT_REPOSITORY_SETTING =
Setting.simpleString("index.store.snapshot.repository_name", Setting.Property.IndexScope, Setting.Property.PrivateIndex);
public static final Setting<String> SNAPSHOT_SNAPSHOT_NAME_SETTING =
Setting.simpleString("index.store.snapshot.snapshot_name", Setting.Property.IndexScope, Setting.Property.PrivateIndex);
public static final Setting<String> SNAPSHOT_SNAPSHOT_ID_SETTING =
Setting.simpleString("index.store.snapshot.snapshot_uuid", Setting.Property.IndexScope, Setting.Property.PrivateIndex);
public static final Setting<String> SNAPSHOT_INDEX_ID_SETTING =
Setting.simpleString("index.store.snapshot.index_uuid", Setting.Property.IndexScope, Setting.Property.PrivateIndex);

public static final String SNAPSHOT_DIRECTORY_FACTORY_KEY = "snapshot";

private static final Setting<String> DELEGATE_TYPE
= new Setting<>("delegate_type", "", Function.identity(), Setting.Property.NodeScope);

private final BlobStoreRepository blobStoreRepository;

public SearchableSnapshotRepository(Repository in) {
super(in);
if (in instanceof BlobStoreRepository == false) {
throw new IllegalArgumentException("Repository [" + in + "] does not support searchable snapshots" );
}
blobStoreRepository = (BlobStoreRepository) in;
}

private Directory makeDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException {

IndexId indexId = new IndexId(indexSettings.getIndex().getName(), SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings()));
BlobContainer blobContainer = blobStoreRepository.shardContainer(indexId, shardPath.getShardId().id());

SnapshotId snapshotId = new SnapshotId(SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()));
BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId);

final SearchableSnapshotDirectory searchableSnapshotDirectory = new SearchableSnapshotDirectory(snapshot, blobContainer);
final InMemoryNoOpCommitDirectory inMemoryNoOpCommitDirectory = new InMemoryNoOpCommitDirectory(searchableSnapshotDirectory);

try (IndexWriter indexWriter = new IndexWriter(inMemoryNoOpCommitDirectory, new IndexWriterConfig())) {
final Map<String, String> userData = new HashMap<>();
indexWriter.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));

final String translogUUID = Translog.createEmptyTranslog(shardPath.resolveTranslog(),
Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
shardPath.getShardId(), 0L);

userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
indexWriter.setLiveCommitData(userData.entrySet());
indexWriter.commit();
}

return inMemoryNoOpCommitDirectory;
}

@Override
public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException {
final IndexMetaData indexMetaData = super.getSnapshotIndexMetaData(snapshotId, index);
final IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData);
builder.settings(Settings.builder().put(indexMetaData.getSettings()).put(getIndexSettings(blobStoreRepository, snapshotId, index)));
return builder.build();
}

public static Settings getIndexSettings(Repository repository, SnapshotId snapshotId, IndexId indexId) {
return Settings.builder()
.put(SNAPSHOT_REPOSITORY_SETTING.getKey(), repository.getMetadata().name())
.put(SNAPSHOT_SNAPSHOT_NAME_SETTING.getKey(), snapshotId.getName())
.put(SNAPSHOT_SNAPSHOT_ID_SETTING.getKey(), snapshotId.getUUID())
.put(SNAPSHOT_INDEX_ID_SETTING.getKey(), indexId.getId())
.put(INDEX_STORE_TYPE_SETTING.getKey(), SNAPSHOT_DIRECTORY_FACTORY_KEY)
.put(IndexMetaData.SETTING_BLOCKS_WRITE, true)
.build();
}

static Factory getRepositoryFactory() {
return new Repository.Factory() {
@Override
public Repository create(RepositoryMetaData metadata) {
throw new UnsupportedOperationException();
}

@Override
public Repository create(RepositoryMetaData metaData, Function<String, Factory> typeLookup) throws Exception {
String delegateType = DELEGATE_TYPE.get(metaData.settings());
if (Strings.hasLength(delegateType) == false) {
throw new IllegalArgumentException(DELEGATE_TYPE.getKey() + " must be set");
}
Repository.Factory factory = typeLookup.apply(delegateType);
return new SearchableSnapshotRepository(factory.create(new RepositoryMetaData(metaData.name(),
delegateType, metaData.settings()), typeLookup));
}
};
}

public static IndexStorePlugin.DirectoryFactory newDirectoryFactory(final Supplier<RepositoriesService> repositoriesService) {
return (indexSettings, shardPath) -> {
final RepositoriesService repositories = repositoriesService.get();
assert repositories != null;

final Repository repository = repositories.repository(SNAPSHOT_REPOSITORY_SETTING.get(indexSettings.getSettings()));
if (repository instanceof SearchableSnapshotRepository == false) {
throw new IllegalArgumentException("Repository [" + repository + "] is not searchable" );
}

return ((SearchableSnapshotRepository)repository).makeDirectory(indexSettings, shardPath);
};
}
}
Loading