Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2196,6 +2196,12 @@ public static Index resolveIndex(String index) {
return new Index(index, uuid);
}

public static String resolveCustomDataPath(String index) {
GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().setIndices(index).get();
assertTrue("index " + index + " not found", getIndexResponse.getSettings().containsKey(index));
return getIndexResponse.getSettings().get(index).get(IndexMetadata.SETTING_DATA_PATH);
}

public static boolean inFipsJvm() {
return Boolean.parseBoolean(System.getProperty(FIPS_SYSPROP));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -113,6 +114,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
private final Set<String> excludedFileTypes;
private final long uncachedChunkSize; // if negative use BlobContainer#readBlobPreferredLength, see #getUncachedChunkSize()
private final Path cacheDir;
private final ShardPath shardPath;
private final AtomicBoolean closed;

// volatile fields are updated once under `this` lock, all together, iff loaded is not true.
Expand All @@ -130,6 +132,7 @@ public SearchableSnapshotDirectory(
LongSupplier currentTimeNanosSupplier,
CacheService cacheService,
Path cacheDir,
ShardPath shardPath,
ThreadPool threadPool
) {
super(new SingleInstanceLockFactory());
Expand All @@ -142,6 +145,7 @@ public SearchableSnapshotDirectory(
this.statsCurrentTimeNanosSupplier = Objects.requireNonNull(currentTimeNanosSupplier);
this.cacheService = Objects.requireNonNull(cacheService);
this.cacheDir = Objects.requireNonNull(cacheDir);
this.shardPath = Objects.requireNonNull(shardPath);
this.closed = new AtomicBoolean(false);
this.useCache = SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings);
this.prewarmCache = useCache ? SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.get(indexSettings) : false;
Expand Down Expand Up @@ -182,6 +186,7 @@ public boolean loadSnapshot() {
this.blobContainer = blobContainerSupplier.get();
this.snapshot = snapshotSupplier.get();
this.loaded = true;
cleanExistingRegularShardFiles();
prewarmCache();
}
}
Expand Down Expand Up @@ -374,6 +379,14 @@ public String toString() {
return this.getClass().getSimpleName() + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory;
}

private void cleanExistingRegularShardFiles() {
try {
IOUtils.rm(shardPath.resolveIndex(), shardPath.resolveTranslog());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private void prewarmCache() {
if (prewarmCache) {
final BlockingQueue<Tuple<ActionListener<Void>, CheckedRunnable<Exception>>> queue = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -513,6 +526,7 @@ public static Directory create(
currentTimeNanosSupplier,
cache,
cacheDir,
shardPath,
threadPool
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import org.elasticsearch.index.store.cache.TestUtils;
Expand All @@ -28,7 +30,9 @@
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -597,6 +601,14 @@ private static void executeTestCase(
final StoreFileMetadata metadata = new StoreFileMetadata(fileName, fileContent.length, "_checksum", Version.CURRENT.luceneVersion);
final List<FileInfo> files = List.of(new FileInfo(blobName, metadata, new ByteSizeValue(fileContent.length)));
final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), 0L, files, 0L, 0L, 0, 0L);
final Path shardDir;
try {
shardDir = new NodeEnvironment.NodePath(createTempDir()).resolve(shardId);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
final ShardPath shardPath = new ShardPath(false, shardDir, shardDir, shardId);
final Path cacheDir = createTempDir();

try (
CacheService ignored = cacheService;
Expand All @@ -609,7 +621,8 @@ private static void executeTestCase(
indexSettings,
statsCurrentTimeNanos,
cacheService,
createTempDir(),
cacheDir,
shardPath,
threadPool
) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.index.store.cache.TestUtils;
Expand All @@ -86,6 +88,7 @@
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
Expand Down Expand Up @@ -548,6 +551,13 @@ protected void assertSnapshotOrGenericThread() {
final BlobContainer blobContainer = repository.shardContainer(indexId, shardId.id());
final BlobStoreIndexShardSnapshot snapshot = repository.loadShardSnapshot(blobContainer, snapshotId);

final Path shardDir;
try {
shardDir = new NodeEnvironment.NodePath(createTempDir()).resolve(shardId);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
final ShardPath shardPath = new ShardPath(false, shardDir, shardDir, shardId);
final Path cacheDir = createTempDir();
final CacheService cacheService = TestUtils.createDefaultCacheService();
releasables.add(cacheService);
Expand All @@ -567,6 +577,7 @@ protected void assertSnapshotOrGenericThread() {
() -> 0L,
cacheService,
cacheDir,
shardPath,
threadPool
)
) {
Expand Down Expand Up @@ -637,6 +648,13 @@ public void testClearCache() throws Exception {
final IndexId indexId = new IndexId("_id", "_uuid");
final ShardId shardId = new ShardId(new Index("_name", "_id"), 0);

final Path shardDir;
try {
shardDir = new NodeEnvironment.NodePath(createTempDir()).resolve(shardId);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
final ShardPath shardPath = new ShardPath(false, shardDir, shardDir, shardId);
final Path cacheDir = createTempDir();
final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilders());
try (
Expand All @@ -655,6 +673,7 @@ public void testClearCache() throws Exception {
() -> 0L,
cacheService,
cacheDir,
shardPath,
threadPool
)
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import org.elasticsearch.common.lucene.store.ESIndexInputTestCase;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.index.store.SearchableSnapshotDirectory;
import org.elasticsearch.index.store.StoreFileMetadata;
Expand All @@ -27,6 +29,7 @@
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.HashSet;
Expand Down Expand Up @@ -83,6 +86,13 @@ public void testRandomReads() throws IOException {
blobContainer = singleBlobContainer;
}

final Path shardDir;
try {
shardDir = new NodeEnvironment.NodePath(createTempDir()).resolve(shardId);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
final ShardPath shardPath = new ShardPath(false, shardDir, shardDir, shardId);
final Path cacheDir = createTempDir();
try (
SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory(
Expand All @@ -98,6 +108,7 @@ public void testRandomReads() throws IOException {
() -> 0L,
cacheService,
cacheDir,
shardPath,
threadPool
)
) {
Expand Down Expand Up @@ -158,6 +169,13 @@ public void testThrowsEOFException() throws IOException {

final BlobContainer blobContainer = singleBlobContainer(blobName, input);
final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilders());
final Path shardDir;
try {
shardDir = new NodeEnvironment.NodePath(createTempDir()).resolve(shardId);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
final ShardPath shardPath = new ShardPath(false, shardDir, shardDir, shardId);
final Path cacheDir = createTempDir();
try (
SearchableSnapshotDirectory searchableSnapshotDirectory = new SearchableSnapshotDirectory(
Expand All @@ -170,6 +188,7 @@ public void testThrowsEOFException() throws IOException {
() -> 0L,
cacheService,
cacheDir,
shardPath,
threadPool
)
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
Expand All @@ -28,9 +29,12 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.RepositoriesService;
Expand All @@ -44,6 +48,8 @@
import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsResponse;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -59,6 +65,7 @@
import java.util.concurrent.CyclicBarrier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
Expand All @@ -68,6 +75,7 @@
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

Expand Down Expand Up @@ -125,8 +133,16 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception {
final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
ensureGreen(indexName);

assertAcked(client().admin().indices().prepareDelete(indexName));
assertShardFolders(indexName, false);

final boolean deletedBeforeMount = randomBoolean();
if (deletedBeforeMount) {
assertAcked(client().admin().indices().prepareDelete(indexName));
} else {
assertAcked(client().admin().indices().prepareClose(indexName));
}

final boolean cacheEnabled = randomBoolean();
logger.info("--> restoring index [{}] with cache [{}]", restoredIndexName, cacheEnabled ? "enabled" : "disabled");
Expand Down Expand Up @@ -187,9 +203,22 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception {

assertRecovered(restoredIndexName, originalAllHits, originalBarHits);
assertSearchableSnapshotStats(restoredIndexName, cacheEnabled, nonCachedExtensions);

assertThat(client().admin().indices().prepareGetAliases(aliasName).get().getAliases().size(), equalTo(0));
assertAcked(client().admin().indices().prepareAliases().addAlias(restoredIndexName, aliasName));
ensureGreen(restoredIndexName);
assertShardFolders(restoredIndexName, true);

if (deletedBeforeMount) {
assertThat(client().admin().indices().prepareGetAliases(aliasName).get().getAliases().size(), equalTo(0));
assertAcked(client().admin().indices().prepareAliases().addAlias(restoredIndexName, aliasName));
} else if (indexName.equals(restoredIndexName) == false) {
assertThat(client().admin().indices().prepareGetAliases(aliasName).get().getAliases().size(), equalTo(1));
assertAcked(
client().admin()
.indices()
.prepareAliases()
.addAliasAction(IndicesAliasesRequest.AliasActions.remove().index(indexName).alias(aliasName).mustExist(true))
.addAlias(restoredIndexName, aliasName)
);
}
assertThat(client().admin().indices().prepareGetAliases(aliasName).get().getAliases().size(), equalTo(1));
assertRecovered(aliasName, originalAllHits, originalBarHits, false);

Expand Down Expand Up @@ -275,6 +304,32 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception {

}

private void assertShardFolders(String indexName, boolean snapshotDirectory) throws IOException {
final Index restoredIndex = resolveIndex(indexName);
final String customDataPath = resolveCustomDataPath(indexName);
final ShardId shardId = new ShardId(restoredIndex, 0);
boolean shardFolderFound = false;
for (String node : internalCluster().getNodeNames()) {
final NodeEnvironment service = internalCluster().getInstance(NodeEnvironment.class, node);
final ShardPath shardPath = ShardPath.loadShardPath(logger, service, shardId, customDataPath);
if (shardPath != null && Files.exists(shardPath.getDataPath())) {
shardFolderFound = true;
assertEquals(snapshotDirectory, Files.notExists(shardPath.resolveIndex()));

assertTrue(Files.exists(shardPath.resolveTranslog()));
try (Stream<Path> dir = Files.list(shardPath.resolveTranslog())) {
final long translogFiles = dir.filter(path -> path.getFileName().toString().contains("translog")).count();
if (snapshotDirectory) {
assertEquals(2L, translogFiles);
} else {
assertThat(translogFiles, greaterThanOrEqualTo(2L));
}
}
}
}
assertTrue("no shard folder found", shardFolderFound);
}

public void testCanMountSnapshotTakenWhileConcurrentlyIndexing() throws Exception {
final String fsRepoName = randomAlphaOfLength(10);
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
Expand Down