diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCacheClearingIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java similarity index 60% rename from x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCacheClearingIntegTests.java rename to x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java index e4e9115393d32..34b657864fa81 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCacheClearingIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java @@ -4,88 +4,41 @@ * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.searchablesnapshots; +package org.elasticsearch.xpack.searchablesnapshots.cache; -import org.apache.lucene.mockfile.FilterFileSystemProvider; -import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.apache.lucene.document.Document; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.PathUtils; -import org.elasticsearch.common.io.PathUtilsForTesting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; -import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; -import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase; import java.io.IOException; import java.nio.file.DirectoryStream; -import java.nio.file.FileSystem; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashSet; import java.util.Locale; +import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX; import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.resolveCacheIndexFolder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.notNullValue; -public class SearchableSnapshotsCacheClearingIntegTests extends BaseSearchableSnapshotsIntegTestCase { - - private static DeleteBlockingFileSystemProvider deleteBlockingFileSystemProvider; - - @BeforeClass - public static void installDeleteBlockingFileSystemProvider() { - FileSystem current = PathUtils.getDefaultFileSystem(); - deleteBlockingFileSystemProvider = new DeleteBlockingFileSystemProvider(current); - PathUtilsForTesting.installMock(deleteBlockingFileSystemProvider.getFileSystem(null)); - } - - @AfterClass - public static void removeDeleteBlockingFileSystemProvider() { - PathUtilsForTesting.teardown(); - } - - void startBlockingDeletes() { - deleteBlockingFileSystemProvider.injectFailures.set(true); - } - - void stopBlockingDeletes() { - deleteBlockingFileSystemProvider.injectFailures.set(false); - } - - private static class DeleteBlockingFileSystemProvider extends FilterFileSystemProvider { - - AtomicBoolean injectFailures = new AtomicBoolean(); - - DeleteBlockingFileSystemProvider(FileSystem inner) { - super("deleteblocking://", inner); - } - - @Override - public boolean deleteIfExists(Path path) throws IOException { - if (injectFailures.get()) { - throw new IOException("blocked deletion of " + path); - } else { - return super.deleteIfExists(path); - } - } - - } +public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal) { @@ -96,7 +49,7 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } - public void testCacheDirectoriesRemovedOnStartup() throws Exception { + public void testCacheSurviveRestart() throws Exception { final String fsRepoName = randomAlphaOfLength(10); final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); @@ -117,18 +70,13 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception { final DiscoveryNodes discoveryNodes = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes(); final String dataNode = randomFrom(discoveryNodes.getDataNodes().values().toArray(DiscoveryNode.class)).getName(); - final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest( - restoredIndexName, + mountSnapshot( fsRepoName, snapshotName, indexName, - Settings.builder().put(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode).build(), - Strings.EMPTY_ARRAY, - true + restoredIndexName, + Settings.builder().put(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode).build() ); - - final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get(); - assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0)); ensureGreen(restoredIndexName); final Index restoredIndex = client().admin() @@ -143,7 +91,9 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception { .getIndex(); final IndexService indexService = internalCluster().getInstance(IndicesService.class, dataNode).indexService(restoredIndex); - final Path shardCachePath = CacheService.getShardCachePath(indexService.getShard(0).shardPath()); + final ShardPath shardPath = indexService.getShard(0).shardPath(); + final Path shardCachePath = CacheService.getShardCachePath(shardPath); + assertTrue(Files.isDirectory(shardCachePath)); final Set cacheFiles = new HashSet<>(); try (DirectoryStream snapshotCacheStream = Files.newDirectoryStream(shardCachePath)) { @@ -159,25 +109,49 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception { } assertFalse("no cache files found", cacheFiles.isEmpty()); - startBlockingDeletes(); + CacheService cacheService = internalCluster().getInstance(CacheService.class, dataNode); + cacheService.synchronizeCache(); + + PersistentCache persistentCache = cacheService.getPersistentCache(); + assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size())); + internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() { @Override public Settings onNodeStopped(String nodeName) { - assertTrue(Files.isDirectory(shardCachePath)); - for (Path cacheFile : cacheFiles) { - assertTrue(cacheFile + " should not have been cleaned up yet", Files.isRegularFile(cacheFile)); + try { + assertTrue(Files.isDirectory(shardCachePath)); + + final Path persistentCacheIndexDir = resolveCacheIndexFolder(shardPath.getRootDataPath()); + assertTrue(Files.isDirectory(persistentCacheIndexDir)); + + final Map documents = PersistentCache.loadDocuments(persistentCacheIndexDir); + assertThat(documents.size(), equalTo(cacheFiles.size())); + + for (Path cacheFile : cacheFiles) { + final String cacheFileName = cacheFile.getFileName().toString(); + assertTrue(cacheFileName + " should exist on disk", Files.isRegularFile(cacheFile)); + assertThat(cacheFileName + " should exist in persistent cache index", documents.get(cacheFileName), notNullValue()); + } + } catch (IOException e) { + throw new AssertionError(e); } - stopBlockingDeletes(); return Settings.EMPTY; } }); + persistentCache = internalCluster().getInstance(CacheService.class, dataNode).getPersistentCache(); + assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size())); ensureGreen(restoredIndexName); - for (Path cacheFile : cacheFiles) { - assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile)); - } + cacheFiles.forEach(cacheFile -> assertTrue(cacheFile + " should have survived node restart", Files.exists(cacheFile))); assertAcked(client().admin().indices().prepareDelete(restoredIndexName)); + + assertBusy(() -> cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile)))); + cacheService = internalCluster().getInstance(CacheService.class, dataNode); + cacheService.synchronizeCache(); + + persistentCache = cacheService.getPersistentCache(); + assertThat(persistentCache.getNumDocs(), equalTo(0L)); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java index c1dd02cb505c8..5793f4d33c5b3 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java @@ -158,6 +158,11 @@ FileChannel getChannel() { return reference == null ? null : reference.fileChannel; } + // Only used in tests + SortedSet> getCompletedRanges() { + return tracker.getCompletedRanges(); + } + public void acquire(final EvictionListener listener) throws IOException { assert listener != null; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java index f3f42ff233d7a..f3ede77a0c58d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java @@ -32,6 +32,7 @@ import java.nio.file.Path; +import static org.elasticsearch.index.store.SearchableSnapshotDirectory.unwrapDirectory; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; @@ -49,6 +50,13 @@ public SearchableSnapshotIndexEventListener(Settings settings, @Nullable CacheSe this.cacheService = cacheService; } + /** + * Called before a searchable snapshot {@link IndexShard} starts to recover. This event is used to trigger the loading of the shard + * snapshot information that contains the list of shard's Lucene files. + * + * @param indexShard the shard that is about to recover + * @param indexSettings the shard's index settings + */ @Override public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) { assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC); @@ -57,7 +65,7 @@ public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexS } private static void ensureSnapshotIsLoaded(IndexShard indexShard) { - final SearchableSnapshotDirectory directory = SearchableSnapshotDirectory.unwrapDirectory(indexShard.store().directory()); + final SearchableSnapshotDirectory directory = unwrapDirectory(indexShard.store().directory()); assert directory != null; final StepListener preWarmListener = new StepListener<>(); final boolean success = directory.loadSnapshot(indexShard.recoveryState(), preWarmListener); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 23f16464d1eea..54321adc8b2f2 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -65,7 +65,7 @@ import org.elasticsearch.xpack.searchablesnapshots.action.TransportRepositoryStatsAction; import org.elasticsearch.xpack.searchablesnapshots.action.TransportSearchableSnapshotsStatsAction; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; -import org.elasticsearch.xpack.searchablesnapshots.cache.NodeEnvironmentCacheCleaner; +import org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache; import org.elasticsearch.xpack.searchablesnapshots.rest.RestClearSearchableSnapshotsCacheAction; import org.elasticsearch.xpack.searchablesnapshots.rest.RestMountSearchableSnapshotAction; import org.elasticsearch.xpack.searchablesnapshots.rest.RestRepositoryStatsAction; @@ -218,12 +218,7 @@ public Collection createComponents( this.threadPool.set(threadPool); this.failShardsListener.set(new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), clusterService.getRerouteService())); if (DiscoveryNode.isDataNode(settings)) { - final CacheService cacheService = new CacheService( - settings, - clusterService, - threadPool, - new NodeEnvironmentCacheCleaner(nodeEnvironment) - ); + final CacheService cacheService = new CacheService(settings, clusterService, threadPool, new PersistentCache(nodeEnvironment)); this.cacheService.set(cacheService); components.add(cacheService); final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService( @@ -234,6 +229,8 @@ public Collection createComponents( ); this.blobStoreCacheService.set(blobStoreCacheService); components.add(blobStoreCacheService); + } else { + PersistentCache.cleanUp(settings, nodeEnvironment); } return Collections.unmodifiableList(components); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index b71fd9275e650..a34e67fffb5fe 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -114,9 +114,9 @@ public class CacheService extends AbstractLifecycleComponent { private final CacheSynchronizationTask cacheSyncTask; private final TimeValue cacheSyncStopTimeout; private final ReentrantLock cacheSyncLock; + private final PersistentCache persistentCache; private final Cache cache; private final ByteSizeValue cacheSize; - private final Runnable cacheCleaner; private final ByteSizeValue rangeSize; private final KeyedLock shardsEvictionLock; private final Set evictedShards; @@ -127,11 +127,10 @@ public CacheService( final Settings settings, final ClusterService clusterService, final ThreadPool threadPool, - final Runnable cacheCleaner + final PersistentCache persistentCache ) { this.threadPool = Objects.requireNonNull(threadPool); this.cacheSize = SNAPSHOT_CACHE_SIZE_SETTING.get(settings); - this.cacheCleaner = Objects.requireNonNull(cacheCleaner); this.rangeSize = SNAPSHOT_CACHE_RANGE_SIZE_SETTING.get(settings); this.cache = CacheBuilder.builder() .setMaximumWeight(cacheSize.getBytes()) @@ -140,6 +139,7 @@ public CacheService( // are done with reading/writing the cache file .removalListener(notification -> onCacheFileRemoval(notification.getValue())) .build(); + this.persistentCache = Objects.requireNonNull(persistentCache); this.shardsEvictionLock = new KeyedLock<>(); this.evictedShards = ConcurrentCollections.newConcurrentSet(); this.numberOfCacheFilesToSync = new AtomicLong(); @@ -157,14 +157,14 @@ public static Path getShardCachePath(ShardPath shardPath) { return resolveSnapshotCache(shardPath.getDataPath()); } - static Path resolveSnapshotCache(Path path) { + public static Path resolveSnapshotCache(Path path) { return path.resolve("snapshot_cache"); } @Override protected void doStart() { + persistentCache.repopulateCache(this); cacheSyncTask.rescheduleIfNecessary(); - cacheCleaner.run(); } @Override @@ -181,10 +181,15 @@ protected void doStop() { logger.warn("interrupted while waiting for cache sync lock", e); } cacheSyncTask.close(); - cache.invalidateAll(); } finally { - if (acquired) { - cacheSyncLock.unlock(); + try { + persistentCache.close(); + } catch (Exception e) { + logger.warn("failed to close persistent cache", e); + } finally { + if (acquired) { + cacheSyncLock.unlock(); + } } } } @@ -397,12 +402,18 @@ private void onCacheFileUpdate(CacheFile cacheFile) { /** * This method is invoked after a {@link CacheFile} is evicted from the cache. *

- * It notifies the {@link CacheFile}'s eviction listeners that the instance is evicted. + * It notifies the {@link CacheFile}'s eviction listeners that the instance is evicted and removes it from the persistent cache. * * @param cacheFile the evicted instance */ private void onCacheFileRemoval(CacheFile cacheFile) { IOUtils.closeWhileHandlingException(cacheFile::startEviction); + try { + persistentCache.removeCacheFile(cacheFile); + } catch (Exception e) { + assert e instanceof IOException : e; + logger.warn("failed to remove cache file from persistent cache", e); + } } // used in tests @@ -410,6 +421,11 @@ boolean isCacheFileToSync(CacheFile cacheFile) { return cacheFilesToSync.contains(cacheFile); } + // used in tests + PersistentCache getPersistentCache() { + return persistentCache; + } + /** * Synchronize the cache files and their parent directories on disk. * @@ -457,23 +473,36 @@ protected void synchronizeCache() { ranges.size() ); final Path cacheDir = cacheFilePath.toAbsolutePath().getParent(); - if (cacheDirs.add(cacheDir)) { + boolean shouldPersist = cacheDirs.contains(cacheDir); + if (shouldPersist == false) { try { - IOUtils.fsync(cacheDir, true, false); + IOUtils.fsync(cacheDir, true, false); // TODO evict cache file if fsync failed logger.trace("cache directory [{}] synchronized", cacheDir); + cacheDirs.add(cacheDir); + shouldPersist = true; } catch (Exception e) { assert e instanceof IOException : e; + shouldPersist = false; logger.warn(() -> new ParameterizedMessage("failed to synchronize cache directory [{}]", cacheDir), e); } } - // TODO Index searchable snapshot shard information + cache file ranges in Lucene - count += 1L; + if (shouldPersist) { + persistentCache.addCacheFile(cacheFile, ranges); + count += 1L; + } } } catch (Exception e) { assert e instanceof IOException : e; logger.warn(() -> new ParameterizedMessage("failed to fsync cache file [{}]", cacheFilePath.getFileName()), e); } } + if (count > 0 || persistentCache.hasDeletions()) { + try { + persistentCache.commit(); + } catch (IOException e) { + logger.error("failed to commit persistent cache after synchronization", e); + } + } if (logger.isDebugEnabled()) { final long elapsedNanos = threadPool.relativeTimeInNanos() - startTimeNanos; logger.debug( diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/NodeEnvironmentCacheCleaner.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/NodeEnvironmentCacheCleaner.java deleted file mode 100644 index f7edc2c663bb8..0000000000000 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/NodeEnvironmentCacheCleaner.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.searchablesnapshots.cache; - -import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.ShardPath; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.file.Files; -import java.nio.file.Path; - -/** - * Cleans any leftover searchable snapshot caches when a node is starting up. - */ -public class NodeEnvironmentCacheCleaner implements Runnable { - - private final NodeEnvironment nodeEnvironment; - - public NodeEnvironmentCacheCleaner(NodeEnvironment nodeEnvironment) { - this.nodeEnvironment = nodeEnvironment; - } - - @Override - public void run() { - try { - for (NodeEnvironment.NodePath nodePath : nodeEnvironment.nodePaths()) { - for (String indexUUID : nodeEnvironment.availableIndexFoldersForPath(nodePath)) { - for (ShardId shardId : nodeEnvironment.findAllShardIds(new Index("_unknown_", indexUUID))) { - final Path shardDataPath = nodePath.resolve(shardId); - final ShardPath shardPath = new ShardPath(false, shardDataPath, shardDataPath, shardId); - final Path shardCachePath = CacheService.getShardCachePath(shardPath); - if (Files.isDirectory(shardCachePath)) { - IOUtils.rm(shardCachePath); - } - } - } - } - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } -} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java new file mode 100644 index 0000000000000..21363b3094e14 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java @@ -0,0 +1,617 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.searchablesnapshots.cache; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.analysis.core.KeywordAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexNotFoundException; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SerialMergeScheduler; +import org.apache.lucene.index.Term; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.ByteBufferStreamInput; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.store.cache.CacheFile; +import org.elasticsearch.index.store.cache.CacheKey; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.Collections.synchronizedMap; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableSortedSet; +import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.getShardCachePath; + +public class PersistentCache implements Closeable { + + private static final Logger logger = LogManager.getLogger(PersistentCache.class); + + private static final String NODE_VERSION_COMMIT_KEY = "node_version"; + + private final NodeEnvironment nodeEnvironment; + private final Map documents; + private final List writers; + private final AtomicBoolean started; + private final AtomicBoolean closed; + + public PersistentCache(NodeEnvironment nodeEnvironment) { + this.documents = synchronizedMap(loadDocuments(nodeEnvironment)); + this.writers = createWriters(nodeEnvironment); + this.nodeEnvironment = nodeEnvironment; + this.started = new AtomicBoolean(); + this.closed = new AtomicBoolean(); + } + + private void ensureOpen() { + if (closed.get()) { + throw new AlreadyClosedException("Persistent cache is already closed"); + } + } + + private void ensureStarted() { + if (started.get() == false) { + throw new IllegalStateException("Persistent cache is not started"); + } + } + + /** + * @return the {@link CacheIndexWriter} to use for the given {@link CacheFile} + */ + private CacheIndexWriter getWriter(CacheFile cacheFile) { + ensureOpen(); + if (writers.size() == 1) { + return writers.get(0); + } else { + final Path path = cacheFile.getFile().toAbsolutePath(); + return writers.stream() + .filter(writer -> path.startsWith(writer.nodePath().path)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Failed to find a Lucene index for cache file path [" + path + ']')); + } + } + + public void addCacheFile(CacheFile cacheFile, SortedSet> ranges) throws IOException { + ensureStarted(); + getWriter(cacheFile).updateCacheFile(cacheFile, ranges); + } + + public void removeCacheFile(CacheFile cacheFile) throws IOException { + ensureStarted(); + getWriter(cacheFile).deleteCacheFile(cacheFile); + } + + /** + * This method repopulates the {@link CacheService} by looking at the files on the disk and for each file found, retrieves the latest + * synchronized information and puts the cache file into the searchable snapshots cache. + * + * This method iterates over all node data paths and all shard directories in order to found the "snapshot_cache" directories that + * contain the cache files. When such a directory is found, the method iterates over the cache files and looks up their name/UUID in + * the existing Lucene documents that were loaded when instanciating the persistent cache index). If no information is found (ie no + * matching docs in the map of Lucene documents) then the file is deleted from disk. If a doc is found the stored fields are extracted + * from the Lucene document and are used to rebuild the necessary {@link CacheKey}, {@link SnapshotId}, {@link IndexId}, {@link ShardId} + * and cache file ranges objects. The Lucene document is then indexed again in the new persistent cache index (the current + * {@link CacheIndexWriter}) and the cache file is added back to the searchable snapshots cache again. Note that adding cache + * file to the cache service might trigger evictions so previously reindexed Lucene cache files might be delete again (see + * CacheService#onCacheFileRemoval(CacheFile) method which calls {@link #removeCacheFile(CacheFile)}. + * + * @param cacheService the {@link CacheService} to use when repopulating {@link CacheFile}. + */ + void repopulateCache(CacheService cacheService) { + ensureOpen(); + if (started.compareAndSet(false, true)) { + try { + for (CacheIndexWriter writer : writers) { + final NodeEnvironment.NodePath nodePath = writer.nodePath(); + logger.debug("loading persistent cache on data path [{}]", nodePath); + + for (String indexUUID : nodeEnvironment.availableIndexFoldersForPath(nodePath)) { + for (ShardId shardId : nodeEnvironment.findAllShardIds(new Index("_unknown_", indexUUID))) { + final Path shardDataPath = writer.nodePath().resolve(shardId); + final Path shardCachePath = getShardCachePath(new ShardPath(false, shardDataPath, shardDataPath, shardId)); + + if (Files.isDirectory(shardCachePath)) { + logger.trace("found snapshot cache dir at [{}], loading cache files from disk and index", shardCachePath); + Files.walkFileTree(shardCachePath, new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + try { + final String id = buildId(file); + final Document cacheDocument = documents.get(id); + if (cacheDocument != null) { + logger.trace("indexing cache file with id [{}] in persistent cache index", id); + writer.updateCacheFile(id, cacheDocument); + + final CacheKey cacheKey = buildCacheKey(cacheDocument); + final long fileLength = getFileLength(cacheDocument); + final SortedSet> ranges = buildCacheFileRanges(cacheDocument); + + logger.trace("adding cache file with [id={}, key={}, ranges={}]", id, cacheKey, ranges); + cacheService.put(cacheKey, fileLength, file.getParent(), id, ranges); + } else { + logger.trace("deleting cache file [{}] (does not exist in persistent cache index)", file); + Files.delete(file); + } + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + return FileVisitResult.CONTINUE; + } + }); + } + } + } + } + for (CacheIndexWriter writer : writers) { + writer.commit(); + } + logger.info("persistent cache index loaded"); + documents.clear(); + } catch (IOException e) { + try { + close(); + } catch (Exception e2) { + logger.warn("failed to close persistent cache index", e2); + e.addSuppressed(e2); + } + throw new UncheckedIOException("Failed to load persistent cache", e); + } finally { + closeIfAnyIndexWriterHasTragedyOrIsClosed(); + } + } else { + assert false : "persistent cache is already loaded"; + } + } + + void commit() throws IOException { + ensureOpen(); + try { + for (CacheIndexWriter writer : writers) { + writer.prepareCommit(); + } + for (CacheIndexWriter writer : writers) { + writer.commit(); + } + } catch (IOException e) { + try { + close(); + } catch (Exception e2) { + logger.warn("failed to close persistent cache index writer", e2); + e.addSuppressed(e2); + } + throw e; + } finally { + closeIfAnyIndexWriterHasTragedyOrIsClosed(); + } + } + + private void closeIfAnyIndexWriterHasTragedyOrIsClosed() { + if (writers.stream().map(writer -> writer.indexWriter).anyMatch(iw -> iw.getTragicException() != null || iw.isOpen() == false)) { + try { + close(); + } catch (Exception e) { + logger.warn("failed to close persistent cache index", e); + } + } + } + + public boolean hasDeletions() { + ensureOpen(); + for (CacheIndexWriter writer : writers) { + if (writer.indexWriter.hasDeletions()) { + return true; + } + } + return false; + } + + public long getNumDocs() { + ensureOpen(); + long count = 0L; + for (CacheIndexWriter writer : writers) { + count += writer.indexWriter.getPendingNumDocs(); + } + return count; + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + try { + IOUtils.close(writers); + } finally { + documents.clear(); + } + } + } + + /** + * Creates a list of {@link CacheIndexWriter}, one for each data path of the specified {@link NodeEnvironment}. + * + * @param nodeEnvironment the data node environment + * @return a list of {@link CacheIndexWriter} + */ + private static List createWriters(NodeEnvironment nodeEnvironment) { + final List writers = new ArrayList<>(); + boolean success = false; + try { + final NodeEnvironment.NodePath[] nodePaths = nodeEnvironment.nodePaths(); + for (NodeEnvironment.NodePath nodePath : nodePaths) { + writers.add(createCacheIndexWriter(nodePath)); + } + success = true; + } catch (IOException e) { + throw new UncheckedIOException("Failed to create persistent cache writers", e); + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(writers); + } + } + return unmodifiableList(writers); + } + + /** + * Creates a new {@link CacheIndexWriter} for the specified data path. The is a single instance per data path. + * + * @param nodePath the data path + * @return a new {@link CacheIndexWriter} instance + * @throws IOException if something went wrong + */ + static CacheIndexWriter createCacheIndexWriter(NodeEnvironment.NodePath nodePath) throws IOException { + final List closeables = new ArrayList<>(); + boolean success = false; + try { + Path directoryPath = createCacheIndexFolder(nodePath); + final Directory directory = FSDirectory.open(directoryPath); + closeables.add(directory); + + final IndexWriterConfig config = new IndexWriterConfig(new KeywordAnalyzer()); + config.setIndexDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); + config.setOpenMode(IndexWriterConfig.OpenMode.CREATE); + config.setMergeScheduler(new SerialMergeScheduler()); + config.setRAMBufferSizeMB(1.0); + config.setCommitOnClose(false); + + final IndexWriter indexWriter = new IndexWriter(directory, config); + closeables.add(indexWriter); + + final CacheIndexWriter cacheIndexWriter = new CacheIndexWriter(nodePath, directory, indexWriter); + success = true; + return cacheIndexWriter; + } finally { + if (success == false) { + IOUtils.close(closeables); + } + } + } + + /** + * Load existing documents from persistent cache indices located at the root of every node path. + * + * @param nodeEnvironment the data node environment + * @return a map of {cache file uuid, Lucene document} + */ + static Map loadDocuments(NodeEnvironment nodeEnvironment) { + final Map documents = new HashMap<>(); + try { + for (NodeEnvironment.NodePath nodePath : nodeEnvironment.nodePaths()) { + final Path directoryPath = resolveCacheIndexFolder(nodePath); + if (Files.exists(directoryPath)) { + documents.putAll(loadDocuments(directoryPath)); + } + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to load existing documents from persistent cache index", e); + } + return documents; + } + + /** + * Load existing documents from a persistent cache Lucene directory. + * + * @param directoryPath the Lucene directory path + * @return a map of {cache file uuid, Lucene document} + */ + static Map loadDocuments(Path directoryPath) throws IOException { + final Map documents = new HashMap<>(); + try (Directory directory = FSDirectory.open(directoryPath)) { + try (IndexReader indexReader = DirectoryReader.open(directory)) { + logger.trace("loading documents from persistent cache index [{}]", directoryPath); + for (LeafReaderContext leafReaderContext : indexReader.leaves()) { + final LeafReader leafReader = leafReaderContext.reader(); + final Bits liveDocs = leafReader.getLiveDocs(); + for (int i = 0; i < leafReader.maxDoc(); i++) { + if (liveDocs == null || liveDocs.get(i)) { + final Document document = leafReader.document(i); + logger.trace("loading document [{}]", document); + documents.put(getValue(document, CACHE_ID_FIELD), document); + } + } + } + } catch (IndexNotFoundException e) { + logger.debug("persistent cache index does not exist yet", e); + } + } + return documents; + } + + /** + * Cleans any leftover searchable snapshot caches (files and Lucene indices) when a non-data node is starting up. + * This is useful when the node is repurposed and is not a data node anymore. + * + * @param nodeEnvironment the {@link NodeEnvironment} to cleanup + */ + public static void cleanUp(Settings settings, NodeEnvironment nodeEnvironment) { + final boolean isDataNode = DiscoveryNode.isDataNode(settings); + if (isDataNode) { + assert false : "should not be called on data nodes"; + throw new IllegalStateException("Cannot clean searchable snapshot caches: node is a data node"); + } + try { + for (NodeEnvironment.NodePath nodePath : nodeEnvironment.nodePaths()) { + for (String indexUUID : nodeEnvironment.availableIndexFoldersForPath(nodePath)) { + for (ShardId shardId : nodeEnvironment.findAllShardIds(new Index("_unknown_", indexUUID))) { + final Path shardDataPath = nodePath.resolve(shardId); + final ShardPath shardPath = new ShardPath(false, shardDataPath, shardDataPath, shardId); + final Path cacheDir = getShardCachePath(shardPath); + if (Files.isDirectory(cacheDir)) { + logger.debug("deleting searchable snapshot shard cache directory [{}]", cacheDir); + IOUtils.rm(cacheDir); + } + } + } + final Path cacheIndexDir = resolveCacheIndexFolder(nodePath); + if (Files.isDirectory(cacheIndexDir)) { + logger.debug("deleting searchable snapshot lucene directory [{}]", cacheIndexDir); + IOUtils.rm(cacheIndexDir); + } + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to clean up searchable snapshots cache", e); + } + } + + /** + * A {@link CacheIndexWriter} contains a Lucene {@link Directory} with an {@link IndexWriter} that can be used to index documents in + * the persistent cache index. There is one {@link CacheIndexWriter} for each data path. + */ + static class CacheIndexWriter implements Closeable { + + private final NodeEnvironment.NodePath nodePath; + private final IndexWriter indexWriter; + private final Directory directory; + + private CacheIndexWriter(NodeEnvironment.NodePath nodePath, Directory directory, IndexWriter indexWriter) { + this.nodePath = nodePath; + this.directory = directory; + this.indexWriter = indexWriter; + } + + NodeEnvironment.NodePath nodePath() { + return nodePath; + } + + void updateCacheFile(CacheFile cacheFile, SortedSet> cacheRanges) throws IOException { + updateCacheFile(buildId(cacheFile), buildDocument(nodePath, cacheFile, cacheRanges)); + } + + void updateCacheFile(String cacheFileId, Document cacheFileDocument) throws IOException { + final Term term = buildTerm(cacheFileId); + logger.debug("updating document with term [{}]", term); + indexWriter.updateDocument(term, cacheFileDocument); + } + + void deleteCacheFile(CacheFile cacheFile) throws IOException { + deleteCacheFile(buildId(cacheFile)); + } + + void deleteCacheFile(String cacheFileId) throws IOException { + final Term term = buildTerm(cacheFileId); + logger.debug("deleting document with term [{}]", term); + indexWriter.deleteDocuments(term); + } + + void prepareCommit() throws IOException { + logger.debug("preparing commit"); + final Map commitData = new HashMap<>(1); + commitData.put(NODE_VERSION_COMMIT_KEY, Integer.toString(Version.CURRENT.id)); + indexWriter.setLiveCommitData(commitData.entrySet()); + indexWriter.prepareCommit(); + } + + void commit() throws IOException { + logger.debug("committing"); + indexWriter.commit(); + } + + @Override + public void close() throws IOException { + logger.debug("closing persistent cache index"); + IOUtils.close(indexWriter, directory); + } + + @Override + public String toString() { + return "[persistent cache index][" + nodePath + ']'; + } + } + + private static final String CACHE_ID_FIELD = "cache_id"; + private static final String CACHE_PATH_FIELD = "cache_path"; + private static final String CACHE_RANGES_FIELD = "cache_ranges"; + private static final String SNAPSHOT_ID_FIELD = "snapshot_id"; + private static final String SNAPSHOT_NAME_FIELD = "snapshot_name"; + private static final String INDEX_ID_FIELD = "index_id"; + private static final String INDEX_NAME_FIELD = "index_name"; + private static final String SHARD_INDEX_NAME_FIELD = "shard_index_name"; + private static final String SHARD_INDEX_ID_FIELD = "shard_index_id"; + private static final String SHARD_ID_FIELD = "shard_id"; + private static final String FILE_NAME_FIELD = "file_name"; + private static final String FILE_LENGTH_FIELD = "file_length"; + + private static String buildId(CacheFile cacheFile) { + return buildId(cacheFile.getFile()); + } + + private static String buildId(Path path) { + return path.getFileName().toString(); + } + + private static Term buildTerm(CacheFile cacheFile) { + return buildTerm(buildId(cacheFile)); + } + + private static Term buildTerm(String cacheFileUuid) { + return new Term(CACHE_ID_FIELD, cacheFileUuid); + } + + private static Document buildDocument(NodeEnvironment.NodePath nodePath, CacheFile cacheFile, SortedSet> cacheRanges) + throws IOException { + final Document document = new Document(); + document.add(new StringField(CACHE_ID_FIELD, buildId(cacheFile), Field.Store.YES)); + document.add(new StringField(CACHE_PATH_FIELD, nodePath.indicesPath.relativize(cacheFile.getFile()).toString(), Field.Store.YES)); + + try (BytesStreamOutput output = new BytesStreamOutput()) { + output.writeVInt(cacheRanges.size()); + for (Tuple cacheRange : cacheRanges) { + output.writeVLong(cacheRange.v1()); + output.writeVLong(cacheRange.v2()); + } + output.flush(); + document.add(new StoredField(CACHE_RANGES_FIELD, output.bytes().toBytesRef())); + } + + final CacheKey cacheKey = cacheFile.getCacheKey(); + document.add(new StringField(FILE_NAME_FIELD, cacheKey.getFileName(), Field.Store.YES)); + document.add(new StringField(FILE_LENGTH_FIELD, Long.toString(cacheFile.getLength()), Field.Store.YES)); + + final SnapshotId snapshotId = cacheKey.getSnapshotId(); + document.add(new StringField(SNAPSHOT_NAME_FIELD, snapshotId.getName(), Field.Store.YES)); + document.add(new StringField(SNAPSHOT_ID_FIELD, snapshotId.getUUID(), Field.Store.YES)); + + final IndexId indexId = cacheKey.getIndexId(); + document.add(new StringField(INDEX_NAME_FIELD, indexId.getName(), Field.Store.YES)); + document.add(new StringField(INDEX_ID_FIELD, indexId.getId(), Field.Store.YES)); + + final ShardId shardId = cacheKey.getShardId(); + document.add(new StringField(SHARD_INDEX_NAME_FIELD, shardId.getIndex().getName(), Field.Store.YES)); + document.add(new StringField(SHARD_INDEX_ID_FIELD, shardId.getIndex().getUUID(), Field.Store.YES)); + document.add(new StringField(SHARD_ID_FIELD, Integer.toString(shardId.getId()), Field.Store.YES)); + + return document; + } + + private static String getValue(Document document, String fieldName) { + final String value = document.get(fieldName); + assert value != null : "no value found for field [" + fieldName + "] and document [" + document + ']'; + return value; + } + + private static CacheKey buildCacheKey(Document document) { + return new CacheKey( + new SnapshotId(getValue(document, SNAPSHOT_NAME_FIELD), getValue(document, SNAPSHOT_ID_FIELD)), + new IndexId(getValue(document, INDEX_NAME_FIELD), getValue(document, INDEX_ID_FIELD)), + new ShardId( + new Index(getValue(document, SHARD_INDEX_NAME_FIELD), getValue(document, SHARD_INDEX_ID_FIELD)), + Integer.parseInt(getValue(document, SHARD_ID_FIELD)) + ), + getValue(document, FILE_NAME_FIELD) + ); + } + + private static long getFileLength(Document document) { + final String fileLength = getValue(document, FILE_LENGTH_FIELD); + assert fileLength != null; + return Long.parseLong(fileLength); + } + + private static SortedSet> buildCacheFileRanges(Document document) throws IOException { + final BytesRef cacheRangesBytesRef = document.getBinaryValue(CACHE_RANGES_FIELD); + assert cacheRangesBytesRef != null; + + final SortedSet> cacheRanges = new TreeSet<>(Comparator.comparingLong(Tuple::v1)); + try (StreamInput input = new ByteBufferStreamInput(ByteBuffer.wrap(cacheRangesBytesRef.bytes))) { + final int length = input.readVInt(); + assert length > 0 : "empty cache ranges"; + Tuple previous = null; + for (int i = 0; i < length; i++) { + final Tuple range = Tuple.tuple(input.readVLong(), input.readVLong()); + assert range.v1() < range.v2() : range; + assert range.v2() <= getFileLength(document); + assert previous == null || previous.v2() < range.v1(); + + final boolean added = cacheRanges.add(range); + assert added : range + " already exist in " + cacheRanges; + previous = range; + } + } + return unmodifiableSortedSet(cacheRanges); + } + + static Path resolveCacheIndexFolder(NodeEnvironment.NodePath nodePath) { + return resolveCacheIndexFolder(nodePath.path); + } + + static Path resolveCacheIndexFolder(Path dataPath) { + return CacheService.resolveSnapshotCache(dataPath); + } + + /** + * Creates a directory for the snapshot cache Lucene index. + */ + private static Path createCacheIndexFolder(NodeEnvironment.NodePath nodePath) throws IOException { + // "snapshot_cache" directory at the root of the specified data path + final Path snapshotCacheRootDir = resolveCacheIndexFolder(nodePath); + if (Files.exists(snapshotCacheRootDir) == false) { + logger.debug("creating new persistent cache index directory [{}]", snapshotCacheRootDir); + Files.createDirectories(snapshotCacheRootDir); + } + return snapshotCacheRootDir; + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java index fb57bc3cd53a3..2a521ec1e3e4d 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; @@ -38,8 +37,8 @@ import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; import java.util.List; @@ -52,6 +51,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; +import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.resolveSnapshotCache; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -604,14 +604,9 @@ private void executeTestCase( final StoreFileMetadata metadata = new StoreFileMetadata(fileName, fileContent.length, "_checksum", Version.CURRENT.luceneVersion); final List files = Collections.singletonList(new FileInfo(blobName, metadata, new ByteSizeValue(fileContent.length))); final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), 0L, files, 0L, 0L, 0, 0L); - final Path shardDir; - try { - shardDir = new NodeEnvironment.NodePath(createTempDir()).resolve(shardId); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + final Path shardDir = randomShardPath(shardId); final ShardPath shardPath = new ShardPath(false, shardDir, shardDir, shardId); - final Path cacheDir = createTempDir(); + final Path cacheDir = Files.createDirectories(resolveSnapshotCache(shardDir).resolve(snapshotId.getUUID())); try ( CacheService ignored = cacheService; diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index 4ec11b9c9ebac..b7dbd77873577 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -126,6 +126,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; +import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.resolveSnapshotCache; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -685,14 +686,9 @@ public void testClearCache() throws Exception { final IndexId indexId = new IndexId("_id", "_uuid"); final ShardId shardId = new ShardId(new Index("_name", "_id"), 0); - final Path shardDir; - try { - shardDir = new NodeEnvironment.NodePath(createTempDir()).resolve(shardId); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + final Path shardDir = randomShardPath(shardId); final ShardPath shardPath = new ShardPath(false, shardDir, shardDir, shardId); - final Path cacheDir = createTempDir(); + final Path cacheDir = Files.createDirectories(resolveSnapshotCache(shardDir).resolve(snapshotId.getUUID())); try ( SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory( () -> blobContainer, diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java index 393cd0aeb3178..65db1837587b0 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java @@ -214,7 +214,7 @@ public void testFSync() throws Exception { final AtomicBoolean needsFSyncCalled = new AtomicBoolean(); final CacheFile cacheFile = new CacheFile( CACHE_KEY, - randomLongBetween(100, 1000), + randomLongBetween(0L, 1000L), fileSystem.resolve("test"), () -> assertFalse(needsFSyncCalled.getAndSet(true)) ); @@ -261,7 +261,7 @@ public void testFSyncOnEvictedFile() throws Exception { final AtomicBoolean needsFSyncCalled = new AtomicBoolean(); final CacheFile cacheFile = new CacheFile( CACHE_KEY, - randomLongBetween(1L, 1000L), + randomLongBetween(0L, 1000L), fileSystem.resolve("test"), () -> assertFalse(needsFSyncCalled.getAndSet(true)) ); @@ -315,7 +315,7 @@ public void testFSyncFailure() throws Exception { final AtomicBoolean needsFSyncCalled = new AtomicBoolean(); final CacheFile cacheFile = new CacheFile( CACHE_KEY, - randomLongBetween(1L, 1000L), + randomLongBetween(0L, 1000L), fileSystem.resolve("test"), () -> assertFalse(needsFSyncCalled.getAndSet(true)) ); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index 8d99956b03602..65a4b953d8c6e 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -14,6 +14,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; import org.elasticsearch.common.blobstore.BlobPath; @@ -64,6 +65,10 @@ public final class TestUtils { private TestUtils() {} public static SortedSet> randomPopulateAndReads(final CacheFile cacheFile) { + return randomPopulateAndReads(cacheFile, (fileChannel, aLong, aLong2) -> {}); + } + + public static SortedSet> randomPopulateAndReads(CacheFile cacheFile, TriConsumer consumer) { final SortedSet> ranges = synchronizedNavigableSet(new TreeSet<>(Comparator.comparingLong(Tuple::v1))); final List> futures = new ArrayList<>(); final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( @@ -71,11 +76,12 @@ public static SortedSet> randomPopulateAndReads(final CacheFil random() ); for (int i = 0; i < between(0, 10); i++) { - final long start = randomLongBetween(0L, cacheFile.getLength() - 1L); - final long end = randomLongBetween(start + 1L, cacheFile.getLength()); + final long start = randomLongBetween(0L, Math.max(0L, cacheFile.getLength() - 1L)); + final long end = randomLongBetween(Math.min(start + 1L, cacheFile.getLength()), cacheFile.getLength()); final Tuple range = Tuple.tuple(start, end); futures.add( cacheFile.populateAndRead(range, range, channel -> Math.toIntExact(end - start), (channel, from, to, progressUpdater) -> { + consumer.apply(channel, from, to); ranges.add(Tuple.tuple(from, to)); progressUpdater.accept(to); }, deterministicTaskQueue.getThreadPool().generic()) @@ -144,6 +150,13 @@ public static SortedSet> mergeContiguousRanges(final SortedSet }); } + public static void assertCacheFileEquals(CacheFile expected, CacheFile actual) { + assertThat(actual.getLength(), equalTo(expected.getLength())); + assertThat(actual.getFile(), equalTo(expected.getFile())); + assertThat(actual.getCacheKey(), equalTo(expected.getCacheKey())); + assertThat(actual.getCompletedRanges(), equalTo(expected.getCompletedRanges())); + } + public static void assertCounter(IndexInputStats.Counter counter, long total, long count, long min, long max) { assertThat(counter.total(), equalTo(total)); assertThat(counter.count(), equalTo(count)); @@ -350,7 +363,7 @@ public Integer getNumberOfFSyncs(Path path) { @Override public FileChannel newFileChannel(Path path, Set options, FileAttribute... attrs) throws IOException { final AtomicInteger counter = files.computeIfAbsent(path, p -> new AtomicInteger(0)); - return new FilterFileChannel(delegate.newFileChannel(toDelegate(path), options, attrs)) { + return new FilterFileChannel(super.newFileChannel(path, options, attrs)) { @Override public void force(boolean metaData) throws IOException { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java index 37b9db47f28b1..56a7ff3a6c19b 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java @@ -36,9 +36,11 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolStats; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; +import org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache; import org.junit.After; import org.junit.Before; +import java.nio.file.Path; import java.util.Collections; import java.util.concurrent.TimeUnit; @@ -85,7 +87,7 @@ public void tearDownTest() throws Exception { * @return a new {@link CacheService} instance configured with default settings */ protected CacheService defaultCacheService() { - return new CacheService(Settings.EMPTY, clusterService, threadPool, AbstractSearchableSnapshotsTestCase::noOpCacheCleaner); + return new CacheService(Settings.EMPTY, clusterService, threadPool, new PersistentCache(nodeEnvironment)); } /** @@ -105,7 +107,7 @@ protected CacheService randomCacheService() { TimeValue.timeValueSeconds(scaledRandomIntBetween(1, 120)) ); } - return new CacheService(cacheSettings.build(), clusterService, threadPool, AbstractSearchableSnapshotsTestCase::noOpCacheCleaner); + return new CacheService(cacheSettings.build(), clusterService, threadPool, new PersistentCache(nodeEnvironment)); } /** @@ -119,11 +121,16 @@ protected CacheService createCacheService(final ByteSizeValue cacheSize, final B .build(), clusterService, threadPool, - AbstractSearchableSnapshotsTestCase::noOpCacheCleaner + new PersistentCache(nodeEnvironment) ); } - protected static void noOpCacheCleaner() {} + /** + * Returns a random shard data path for the specified {@link ShardId}. The returned path can be located on any of the data node paths. + */ + protected Path randomShardPath(ShardId shardId) { + return randomFrom(nodeEnvironment.availableShardPaths(shardId)); + } /** * @return a random {@link ByteSizeValue} that can be used to set {@link CacheService#SNAPSHOT_CACHE_SIZE_SETTING}. diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java index 8a1fc0847df62..6e42b6a1abf05 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.searchablesnapshots.cache; +import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; @@ -36,12 +37,14 @@ import static java.util.Collections.emptySortedSet; import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; import static org.elasticsearch.index.store.cache.TestUtils.randomRanges; +import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.resolveSnapshotCache; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +@LuceneTestCase.SuppressFileSystems("ExtrasFS") // we don't want extra empty dirs in snapshot cache root dirs public class CacheServiceTests extends AbstractSearchableSnapshotsTestCase { private static FSyncTrackingFileSystemProvider fileSystemProvider; @@ -67,11 +70,11 @@ public void testCacheSynchronization() throws Exception { logger.debug("--> creating shard cache directories on disk"); final Path[] shardsCacheDirs = new Path[numShards]; for (int i = 0; i < numShards; i++) { - final Path shardDataPath = randomFrom(nodeEnvironment.availableShardPaths(new ShardId(index, i))); + final Path shardDataPath = randomShardPath(new ShardId(index, i)); assertFalse(Files.exists(shardDataPath)); logger.debug("--> creating directories [{}] for shard [{}]", shardDataPath.toAbsolutePath(), i); - shardsCacheDirs[i] = Files.createDirectories(CacheService.resolveSnapshotCache(shardDataPath).resolve(snapshotId.getUUID())); + shardsCacheDirs[i] = Files.createDirectories(resolveSnapshotCache(shardDataPath).resolve(snapshotId.getUUID())); } try (CacheService cacheService = defaultCacheService()) { @@ -108,7 +111,7 @@ public void testCacheSynchronization() throws Exception { final ShardId shardId = new ShardId(index, randomIntBetween(0, numShards - 1)); final String fileName = String.format(Locale.ROOT, "file_%d_%d", iteration, i); final CacheKey cacheKey = new CacheKey(snapshotId, indexId, shardId, fileName); - final CacheFile cacheFile = cacheService.get(cacheKey, randomIntBetween(1, 10_000), shardsCacheDirs[shardId.id()]); + final CacheFile cacheFile = cacheService.get(cacheKey, randomIntBetween(0, 10_000), shardsCacheDirs[shardId.id()]); final CacheFile.EvictionListener listener = evictedCacheFile -> {}; cacheFile.acquire(listener); @@ -172,7 +175,6 @@ public void testCacheSynchronization() throws Exception { } public void testPut() throws Exception { - final Path cacheDir = createTempDir(); try (CacheService cacheService = defaultCacheService()) { final long fileLength = randomLongBetween(0L, 1000L); final CacheKey cacheKey = new CacheKey( @@ -181,6 +183,10 @@ public void testPut() throws Exception { new ShardId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random()), randomInt(5)), randomAlphaOfLength(105).toLowerCase(Locale.ROOT) ); + + final Path cacheDir = Files.createDirectories( + resolveSnapshotCache(randomShardPath(cacheKey.getShardId())).resolve(cacheKey.getSnapshotId().getUUID()) + ); final String cacheFileUuid = UUIDs.randomBase64UUID(random()); final SortedSet> cacheFileRanges = randomBoolean() ? randomRanges(fileLength) : emptySortedSet(); @@ -205,6 +211,7 @@ public void testPut() throws Exception { FileNotFoundException.class, () -> cacheService.put(cacheKey, fileLength, cacheDir, cacheFileUuid, cacheFileRanges) ); + cacheService.start(); assertThat(exception.getMessage(), containsString(cacheFileUuid)); } } @@ -214,6 +221,7 @@ public void testRunIfShardMarkedAsEvictedInCache() throws Exception { final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); final IndexId indexId = new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); final ShardId shardId = new ShardId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random()), 0); + final Path cacheDir = Files.createDirectories(resolveSnapshotCache(randomShardPath(shardId)).resolve(snapshotId.getUUID())); final CacheService cacheService = defaultCacheService(); cacheService.setCacheSyncInterval(TimeValue.ZERO); @@ -230,7 +238,7 @@ public void testRunIfShardMarkedAsEvictedInCache() throws Exception { final PlainActionFuture waitForEviction = PlainActionFuture.newFuture(); final CacheFile.EvictionListener evictionListener = evicted -> waitForEviction.onResponse(null); - final CacheFile cacheFile = cacheService.get(new CacheKey(snapshotId, indexId, shardId, "_0.dvd"), 100, createTempDir()); + final CacheFile cacheFile = cacheService.get(new CacheKey(snapshotId, indexId, shardId, "_0.dvd"), 100, cacheDir); cacheFile.acquire(evictionListener); cacheService.markShardAsEvictedInCache(snapshotId, indexId, shardId); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java new file mode 100644 index 0000000000000..378d9f9bb83ec --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java @@ -0,0 +1,239 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.searchablesnapshots.cache; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.cache.CacheFile; +import org.elasticsearch.index.store.cache.CacheKey; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.stream.Collectors; + +import static org.elasticsearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES; +import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_ROLE; +import static org.elasticsearch.index.store.cache.TestUtils.assertCacheFileEquals; +import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; +import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.createCacheIndexWriter; +import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.resolveCacheIndexFolder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class PersistentCacheTests extends AbstractSearchableSnapshotsTestCase { + + public void testCacheIndexWriter() throws Exception { + final NodeEnvironment.NodePath nodePath = randomFrom(nodeEnvironment.nodePaths()); + + int docId = 0; + final Map liveDocs = new HashMap<>(); + final Set deletedDocs = new HashSet<>(); + + for (int iter = 0; iter < 20; iter++) { + + final Path snapshotCacheIndexDir = resolveCacheIndexFolder(nodePath); + assertThat(Files.exists(snapshotCacheIndexDir), equalTo(iter > 0)); + + // load existing documents from persistent cache index before each iteration + final Map documents = PersistentCache.loadDocuments(nodeEnvironment); + assertThat(documents.size(), equalTo(liveDocs.size())); + + try (PersistentCache.CacheIndexWriter writer = createCacheIndexWriter(nodePath)) { + assertThat(writer.nodePath(), sameInstance(nodePath)); + + // verify that existing documents are loaded + for (Map.Entry liveDoc : liveDocs.entrySet()) { + final Document document = documents.get(liveDoc.getKey()); + assertThat("Document should be loaded", document, notNullValue()); + final String iteration = document.get("update_iteration"); + assertThat(iteration, equalTo(String.valueOf(liveDoc.getValue()))); + writer.updateCacheFile(liveDoc.getKey(), document); + } + + // verify that deleted documents are not loaded + for (String deletedDoc : deletedDocs) { + final Document document = documents.get(deletedDoc); + assertThat("Document should not be loaded", document, nullValue()); + } + + // random updates of existing documents + final Map updatedDocs = new HashMap<>(); + for (String cacheId : randomSubsetOf(liveDocs.keySet())) { + final Document document = new Document(); + document.add(new StringField("cache_id", cacheId, Field.Store.YES)); + document.add(new StringField("update_iteration", String.valueOf(iter), Field.Store.YES)); + writer.updateCacheFile(cacheId, document); + + updatedDocs.put(cacheId, iter); + } + + // create new random documents + final Map newDocs = new HashMap<>(); + for (int i = 0; i < between(1, 10); i++) { + final String cacheId = String.valueOf(docId++); + final Document document = new Document(); + document.add(new StringField("cache_id", cacheId, Field.Store.YES)); + document.add(new StringField("update_iteration", String.valueOf(iter), Field.Store.YES)); + writer.updateCacheFile(cacheId, document); + + newDocs.put(cacheId, iter); + } + + // deletes random documents + final Map removedDocs = new HashMap<>(); + for (String cacheId : randomSubsetOf(Sets.union(liveDocs.keySet(), newDocs.keySet()))) { + writer.deleteCacheFile(cacheId); + + removedDocs.put(cacheId, iter); + } + + boolean commit = false; + if (frequently()) { + writer.prepareCommit(); + if (frequently()) { + writer.commit(); + commit = true; + } + } + + if (commit) { + liveDocs.putAll(updatedDocs); + liveDocs.putAll(newDocs); + for (String cacheId : removedDocs.keySet()) { + liveDocs.remove(cacheId); + deletedDocs.add(cacheId); + } + } + } + } + } + + public void testRepopulateCache() throws Exception { + final CacheService cacheService = defaultCacheService(); + cacheService.setCacheSyncInterval(TimeValue.ZERO); + cacheService.start(); + + final List cacheFiles = generateRandomCacheFiles(cacheService); + cacheService.synchronizeCache(); + + if (cacheFiles.isEmpty() == false) { + final List removedCacheFiles = randomSubsetOf(cacheFiles); + for (CacheFile removedCacheFile : removedCacheFiles) { + if (randomBoolean()) { + // evict cache file from the cache + cacheService.removeFromCache(removedCacheFile.getCacheKey()); + } else { + IOUtils.rm(removedCacheFile.getFile()); + } + cacheFiles.remove(removedCacheFile); + } + } + cacheService.stop(); + + final CacheService newCacheService = defaultCacheService(); + newCacheService.start(); + for (CacheFile cacheFile : cacheFiles) { + CacheFile newCacheFile = newCacheService.get(cacheFile.getCacheKey(), cacheFile.getLength(), cacheFile.getFile().getParent()); + assertThat(newCacheFile, notNullValue()); + assertThat(newCacheFile, not(sameInstance(cacheFile))); + assertCacheFileEquals(newCacheFile, cacheFile); + } + newCacheService.stop(); + } + + public void testCleanUp() throws Exception { + final List cacheFiles; + try (CacheService cacheService = defaultCacheService()) { + cacheService.start(); + cacheFiles = generateRandomCacheFiles(cacheService).stream().map(CacheFile::getFile).collect(Collectors.toList()); + if (randomBoolean()) { + cacheService.synchronizeCache(); + } + } + + final Settings nodeSettings = Settings.builder() + .put(NODE_ROLES_SETTING.getKey(), randomValueOtherThan(DATA_ROLE, () -> randomFrom(BUILT_IN_ROLES)).roleName()) + .build(); + + assertTrue(cacheFiles.stream().allMatch(Files::exists)); + PersistentCache.cleanUp(nodeSettings, nodeEnvironment); + assertTrue(cacheFiles.stream().noneMatch(Files::exists)); + } + + /** + * Generates 1 or more cache files using the specified {@link CacheService}. + */ + private List generateRandomCacheFiles(CacheService cacheService) throws Exception { + final byte[] buffer = new byte[1024]; + Arrays.fill(buffer, (byte) 0xff); + + final List cacheFiles = new ArrayList<>(); + for (int snapshots = 0; snapshots < between(1, 2); snapshots++) { + SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); + for (int indices = 0; indices < between(1, 2); indices++) { + IndexId indexId = new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); + for (int shards = 0; shards < between(1, 2); shards++) { + ShardId shardId = new ShardId(indexId.getName(), indexId.getId(), shards); + + final Path cacheDir = Files.createDirectories( + CacheService.resolveSnapshotCache(randomShardPath(shardId)).resolve(snapshotId.getUUID()) + ); + + for (int files = 0; files < between(1, 2); files++) { + final CacheKey cacheKey = new CacheKey(snapshotId, indexId, shardId, "file_" + files); + final CacheFile cacheFile = cacheService.get(cacheKey, randomLongBetween(0L, buffer.length), cacheDir); + + final CacheFile.EvictionListener listener = evictedCacheFile -> {}; + cacheFile.acquire(listener); + try { + SortedSet> ranges = randomPopulateAndReads(cacheFile, (channel, from, to) -> { + try { + channel.write(ByteBuffer.wrap(buffer, Math.toIntExact(from), Math.toIntExact(to))); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + if (ranges.isEmpty() == false) { + cacheFiles.add(cacheFile); + } + } finally { + cacheFile.release(listener); + } + } + } + } + } + return cacheFiles; + } +}