From e15eebdf1cc295aae62941245e999b792dcd4b7c Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 14 Dec 2020 15:25:45 +0100 Subject: [PATCH 1/2] Make searchable snapshots cache persistent (#65725) The searchable snapshots cache implemented in 7.10 is not persisted across node restarts, forcing data nodes to download files from the snapshot repository again once the node is restarted. This commit introduces a new Lucene index that is used to store information about cache files. The information about cache files are periodically updated and committed in this index as part of the cache synchronization task added in #64696. When the data node starts the Lucene index is used to load in memory the cache files information; these information are then used to repopulate the searchable snapshots cache with the cache files that exist on disk. Since data nodes can have one or more data paths, this change introduces a Lucene index per data path. Information about cache files are updated in the Lucene index located on the same data path of the cache files. --- ...leSnapshotsPersistentCacheIntegTests.java} | 120 ++-- .../index/store/cache/CacheFile.java | 5 + .../SearchableSnapshotIndexEventListener.java | 10 +- .../SearchableSnapshots.java | 11 +- .../cache/CacheService.java | 55 +- .../cache/NodeEnvironmentCacheCleaner.java | 50 -- .../cache/PersistentCache.java | 617 ++++++++++++++++++ ...SearchableSnapshotDirectoryStatsTests.java | 13 +- .../SearchableSnapshotDirectoryTests.java | 10 +- .../index/store/cache/CacheFileTests.java | 6 +- .../index/store/cache/TestUtils.java | 19 +- .../AbstractSearchableSnapshotsTestCase.java | 15 +- .../cache/CacheServiceTests.java | 18 +- .../cache/PersistentCacheTests.java | 239 +++++++ 14 files changed, 1013 insertions(+), 175 deletions(-) rename x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/{SearchableSnapshotsCacheClearingIntegTests.java => cache/SearchableSnapshotsPersistentCacheIntegTests.java} (60%) delete mode 100644 x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/NodeEnvironmentCacheCleaner.java create mode 100644 x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java create mode 100644 x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCacheClearingIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java similarity index 60% rename from x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCacheClearingIntegTests.java rename to x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java index e4e9115393d32..34b657864fa81 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCacheClearingIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java @@ -4,88 +4,41 @@ * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.searchablesnapshots; +package org.elasticsearch.xpack.searchablesnapshots.cache; -import org.apache.lucene.mockfile.FilterFileSystemProvider; -import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.apache.lucene.document.Document; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.PathUtils; -import org.elasticsearch.common.io.PathUtilsForTesting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; -import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; -import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase; import java.io.IOException; import java.nio.file.DirectoryStream; -import java.nio.file.FileSystem; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashSet; import java.util.Locale; +import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX; import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.resolveCacheIndexFolder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.notNullValue; -public class SearchableSnapshotsCacheClearingIntegTests extends BaseSearchableSnapshotsIntegTestCase { - - private static DeleteBlockingFileSystemProvider deleteBlockingFileSystemProvider; - - @BeforeClass - public static void installDeleteBlockingFileSystemProvider() { - FileSystem current = PathUtils.getDefaultFileSystem(); - deleteBlockingFileSystemProvider = new DeleteBlockingFileSystemProvider(current); - PathUtilsForTesting.installMock(deleteBlockingFileSystemProvider.getFileSystem(null)); - } - - @AfterClass - public static void removeDeleteBlockingFileSystemProvider() { - PathUtilsForTesting.teardown(); - } - - void startBlockingDeletes() { - deleteBlockingFileSystemProvider.injectFailures.set(true); - } - - void stopBlockingDeletes() { - deleteBlockingFileSystemProvider.injectFailures.set(false); - } - - private static class DeleteBlockingFileSystemProvider extends FilterFileSystemProvider { - - AtomicBoolean injectFailures = new AtomicBoolean(); - - DeleteBlockingFileSystemProvider(FileSystem inner) { - super("deleteblocking://", inner); - } - - @Override - public boolean deleteIfExists(Path path) throws IOException { - if (injectFailures.get()) { - throw new IOException("blocked deletion of " + path); - } else { - return super.deleteIfExists(path); - } - } - - } +public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal) { @@ -96,7 +49,7 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } - public void testCacheDirectoriesRemovedOnStartup() throws Exception { + public void testCacheSurviveRestart() throws Exception { final String fsRepoName = randomAlphaOfLength(10); final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); @@ -117,18 +70,13 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception { final DiscoveryNodes discoveryNodes = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes(); final String dataNode = randomFrom(discoveryNodes.getDataNodes().values().toArray(DiscoveryNode.class)).getName(); - final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest( - restoredIndexName, + mountSnapshot( fsRepoName, snapshotName, indexName, - Settings.builder().put(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode).build(), - Strings.EMPTY_ARRAY, - true + restoredIndexName, + Settings.builder().put(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode).build() ); - - final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get(); - assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0)); ensureGreen(restoredIndexName); final Index restoredIndex = client().admin() @@ -143,7 +91,9 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception { .getIndex(); final IndexService indexService = internalCluster().getInstance(IndicesService.class, dataNode).indexService(restoredIndex); - final Path shardCachePath = CacheService.getShardCachePath(indexService.getShard(0).shardPath()); + final ShardPath shardPath = indexService.getShard(0).shardPath(); + final Path shardCachePath = CacheService.getShardCachePath(shardPath); + assertTrue(Files.isDirectory(shardCachePath)); final Set cacheFiles = new HashSet<>(); try (DirectoryStream snapshotCacheStream = Files.newDirectoryStream(shardCachePath)) { @@ -159,25 +109,49 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception { } assertFalse("no cache files found", cacheFiles.isEmpty()); - startBlockingDeletes(); + CacheService cacheService = internalCluster().getInstance(CacheService.class, dataNode); + cacheService.synchronizeCache(); + + PersistentCache persistentCache = cacheService.getPersistentCache(); + assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size())); + internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() { @Override public Settings onNodeStopped(String nodeName) { - assertTrue(Files.isDirectory(shardCachePath)); - for (Path cacheFile : cacheFiles) { - assertTrue(cacheFile + " should not have been cleaned up yet", Files.isRegularFile(cacheFile)); + try { + assertTrue(Files.isDirectory(shardCachePath)); + + final Path persistentCacheIndexDir = resolveCacheIndexFolder(shardPath.getRootDataPath()); + assertTrue(Files.isDirectory(persistentCacheIndexDir)); + + final Map documents = PersistentCache.loadDocuments(persistentCacheIndexDir); + assertThat(documents.size(), equalTo(cacheFiles.size())); + + for (Path cacheFile : cacheFiles) { + final String cacheFileName = cacheFile.getFileName().toString(); + assertTrue(cacheFileName + " should exist on disk", Files.isRegularFile(cacheFile)); + assertThat(cacheFileName + " should exist in persistent cache index", documents.get(cacheFileName), notNullValue()); + } + } catch (IOException e) { + throw new AssertionError(e); } - stopBlockingDeletes(); return Settings.EMPTY; } }); + persistentCache = internalCluster().getInstance(CacheService.class, dataNode).getPersistentCache(); + assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size())); ensureGreen(restoredIndexName); - for (Path cacheFile : cacheFiles) { - assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile)); - } + cacheFiles.forEach(cacheFile -> assertTrue(cacheFile + " should have survived node restart", Files.exists(cacheFile))); assertAcked(client().admin().indices().prepareDelete(restoredIndexName)); + + assertBusy(() -> cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile)))); + cacheService = internalCluster().getInstance(CacheService.class, dataNode); + cacheService.synchronizeCache(); + + persistentCache = cacheService.getPersistentCache(); + assertThat(persistentCache.getNumDocs(), equalTo(0L)); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java index c1dd02cb505c8..5793f4d33c5b3 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java @@ -158,6 +158,11 @@ FileChannel getChannel() { return reference == null ? null : reference.fileChannel; } + // Only used in tests + SortedSet> getCompletedRanges() { + return tracker.getCompletedRanges(); + } + public void acquire(final EvictionListener listener) throws IOException { assert listener != null; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java index f3f42ff233d7a..f3ede77a0c58d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java @@ -32,6 +32,7 @@ import java.nio.file.Path; +import static org.elasticsearch.index.store.SearchableSnapshotDirectory.unwrapDirectory; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; @@ -49,6 +50,13 @@ public SearchableSnapshotIndexEventListener(Settings settings, @Nullable CacheSe this.cacheService = cacheService; } + /** + * Called before a searchable snapshot {@link IndexShard} starts to recover. This event is used to trigger the loading of the shard + * snapshot information that contains the list of shard's Lucene files. + * + * @param indexShard the shard that is about to recover + * @param indexSettings the shard's index settings + */ @Override public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) { assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC); @@ -57,7 +65,7 @@ public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexS } private static void ensureSnapshotIsLoaded(IndexShard indexShard) { - final SearchableSnapshotDirectory directory = SearchableSnapshotDirectory.unwrapDirectory(indexShard.store().directory()); + final SearchableSnapshotDirectory directory = unwrapDirectory(indexShard.store().directory()); assert directory != null; final StepListener preWarmListener = new StepListener<>(); final boolean success = directory.loadSnapshot(indexShard.recoveryState(), preWarmListener); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 23f16464d1eea..54321adc8b2f2 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -65,7 +65,7 @@ import org.elasticsearch.xpack.searchablesnapshots.action.TransportRepositoryStatsAction; import org.elasticsearch.xpack.searchablesnapshots.action.TransportSearchableSnapshotsStatsAction; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; -import org.elasticsearch.xpack.searchablesnapshots.cache.NodeEnvironmentCacheCleaner; +import org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache; import org.elasticsearch.xpack.searchablesnapshots.rest.RestClearSearchableSnapshotsCacheAction; import org.elasticsearch.xpack.searchablesnapshots.rest.RestMountSearchableSnapshotAction; import org.elasticsearch.xpack.searchablesnapshots.rest.RestRepositoryStatsAction; @@ -218,12 +218,7 @@ public Collection createComponents( this.threadPool.set(threadPool); this.failShardsListener.set(new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), clusterService.getRerouteService())); if (DiscoveryNode.isDataNode(settings)) { - final CacheService cacheService = new CacheService( - settings, - clusterService, - threadPool, - new NodeEnvironmentCacheCleaner(nodeEnvironment) - ); + final CacheService cacheService = new CacheService(settings, clusterService, threadPool, new PersistentCache(nodeEnvironment)); this.cacheService.set(cacheService); components.add(cacheService); final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService( @@ -234,6 +229,8 @@ public Collection createComponents( ); this.blobStoreCacheService.set(blobStoreCacheService); components.add(blobStoreCacheService); + } else { + PersistentCache.cleanUp(settings, nodeEnvironment); } return Collections.unmodifiableList(components); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index b71fd9275e650..a34e67fffb5fe 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -114,9 +114,9 @@ public class CacheService extends AbstractLifecycleComponent { private final CacheSynchronizationTask cacheSyncTask; private final TimeValue cacheSyncStopTimeout; private final ReentrantLock cacheSyncLock; + private final PersistentCache persistentCache; private final Cache cache; private final ByteSizeValue cacheSize; - private final Runnable cacheCleaner; private final ByteSizeValue rangeSize; private final KeyedLock shardsEvictionLock; private final Set evictedShards; @@ -127,11 +127,10 @@ public CacheService( final Settings settings, final ClusterService clusterService, final ThreadPool threadPool, - final Runnable cacheCleaner + final PersistentCache persistentCache ) { this.threadPool = Objects.requireNonNull(threadPool); this.cacheSize = SNAPSHOT_CACHE_SIZE_SETTING.get(settings); - this.cacheCleaner = Objects.requireNonNull(cacheCleaner); this.rangeSize = SNAPSHOT_CACHE_RANGE_SIZE_SETTING.get(settings); this.cache = CacheBuilder.builder() .setMaximumWeight(cacheSize.getBytes()) @@ -140,6 +139,7 @@ public CacheService( // are done with reading/writing the cache file .removalListener(notification -> onCacheFileRemoval(notification.getValue())) .build(); + this.persistentCache = Objects.requireNonNull(persistentCache); this.shardsEvictionLock = new KeyedLock<>(); this.evictedShards = ConcurrentCollections.newConcurrentSet(); this.numberOfCacheFilesToSync = new AtomicLong(); @@ -157,14 +157,14 @@ public static Path getShardCachePath(ShardPath shardPath) { return resolveSnapshotCache(shardPath.getDataPath()); } - static Path resolveSnapshotCache(Path path) { + public static Path resolveSnapshotCache(Path path) { return path.resolve("snapshot_cache"); } @Override protected void doStart() { + persistentCache.repopulateCache(this); cacheSyncTask.rescheduleIfNecessary(); - cacheCleaner.run(); } @Override @@ -181,10 +181,15 @@ protected void doStop() { logger.warn("interrupted while waiting for cache sync lock", e); } cacheSyncTask.close(); - cache.invalidateAll(); } finally { - if (acquired) { - cacheSyncLock.unlock(); + try { + persistentCache.close(); + } catch (Exception e) { + logger.warn("failed to close persistent cache", e); + } finally { + if (acquired) { + cacheSyncLock.unlock(); + } } } } @@ -397,12 +402,18 @@ private void onCacheFileUpdate(CacheFile cacheFile) { /** * This method is invoked after a {@link CacheFile} is evicted from the cache. *

- * It notifies the {@link CacheFile}'s eviction listeners that the instance is evicted. + * It notifies the {@link CacheFile}'s eviction listeners that the instance is evicted and removes it from the persistent cache. * * @param cacheFile the evicted instance */ private void onCacheFileRemoval(CacheFile cacheFile) { IOUtils.closeWhileHandlingException(cacheFile::startEviction); + try { + persistentCache.removeCacheFile(cacheFile); + } catch (Exception e) { + assert e instanceof IOException : e; + logger.warn("failed to remove cache file from persistent cache", e); + } } // used in tests @@ -410,6 +421,11 @@ boolean isCacheFileToSync(CacheFile cacheFile) { return cacheFilesToSync.contains(cacheFile); } + // used in tests + PersistentCache getPersistentCache() { + return persistentCache; + } + /** * Synchronize the cache files and their parent directories on disk. * @@ -457,23 +473,36 @@ protected void synchronizeCache() { ranges.size() ); final Path cacheDir = cacheFilePath.toAbsolutePath().getParent(); - if (cacheDirs.add(cacheDir)) { + boolean shouldPersist = cacheDirs.contains(cacheDir); + if (shouldPersist == false) { try { - IOUtils.fsync(cacheDir, true, false); + IOUtils.fsync(cacheDir, true, false); // TODO evict cache file if fsync failed logger.trace("cache directory [{}] synchronized", cacheDir); + cacheDirs.add(cacheDir); + shouldPersist = true; } catch (Exception e) { assert e instanceof IOException : e; + shouldPersist = false; logger.warn(() -> new ParameterizedMessage("failed to synchronize cache directory [{}]", cacheDir), e); } } - // TODO Index searchable snapshot shard information + cache file ranges in Lucene - count += 1L; + if (shouldPersist) { + persistentCache.addCacheFile(cacheFile, ranges); + count += 1L; + } } } catch (Exception e) { assert e instanceof IOException : e; logger.warn(() -> new ParameterizedMessage("failed to fsync cache file [{}]", cacheFilePath.getFileName()), e); } } + if (count > 0 || persistentCache.hasDeletions()) { + try { + persistentCache.commit(); + } catch (IOException e) { + logger.error("failed to commit persistent cache after synchronization", e); + } + } if (logger.isDebugEnabled()) { final long elapsedNanos = threadPool.relativeTimeInNanos() - startTimeNanos; logger.debug( diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/NodeEnvironmentCacheCleaner.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/NodeEnvironmentCacheCleaner.java deleted file mode 100644 index f7edc2c663bb8..0000000000000 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/NodeEnvironmentCacheCleaner.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.searchablesnapshots.cache; - -import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.ShardPath; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.file.Files; -import java.nio.file.Path; - -/** - * Cleans any leftover searchable snapshot caches when a node is starting up. - */ -public class NodeEnvironmentCacheCleaner implements Runnable { - - private final NodeEnvironment nodeEnvironment; - - public NodeEnvironmentCacheCleaner(NodeEnvironment nodeEnvironment) { - this.nodeEnvironment = nodeEnvironment; - } - - @Override - public void run() { - try { - for (NodeEnvironment.NodePath nodePath : nodeEnvironment.nodePaths()) { - for (String indexUUID : nodeEnvironment.availableIndexFoldersForPath(nodePath)) { - for (ShardId shardId : nodeEnvironment.findAllShardIds(new Index("_unknown_", indexUUID))) { - final Path shardDataPath = nodePath.resolve(shardId); - final ShardPath shardPath = new ShardPath(false, shardDataPath, shardDataPath, shardId); - final Path shardCachePath = CacheService.getShardCachePath(shardPath); - if (Files.isDirectory(shardCachePath)) { - IOUtils.rm(shardCachePath); - } - } - } - } - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } -} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java new file mode 100644 index 0000000000000..b656a2b40979a --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java @@ -0,0 +1,617 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.searchablesnapshots.cache; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.analysis.core.KeywordAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexNotFoundException; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SerialMergeScheduler; +import org.apache.lucene.index.Term; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.ByteBufferStreamInput; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.store.cache.CacheFile; +import org.elasticsearch.index.store.cache.CacheKey; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.Collections.synchronizedMap; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableSortedSet; +import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.getShardCachePath; + +public class PersistentCache implements Closeable { + + private static final Logger logger = LogManager.getLogger(PersistentCache.class); + + private static final String NODE_VERSION_COMMIT_KEY = "node_version"; + + private final NodeEnvironment nodeEnvironment; + private final Map documents; + private final List writers; + private final AtomicBoolean started; + private final AtomicBoolean closed; + + public PersistentCache(NodeEnvironment nodeEnvironment) { + this.documents = synchronizedMap(loadDocuments(nodeEnvironment)); + this.writers = createWriters(nodeEnvironment); + this.nodeEnvironment = nodeEnvironment; + this.started = new AtomicBoolean(); + this.closed = new AtomicBoolean(); + } + + private void ensureOpen() { + if (closed.get()) { + throw new AlreadyClosedException("Persistent cache is already closed"); + } + } + + private void ensureStarted() { + if (started.get() == false) { + throw new IllegalStateException("Persistent cache is not started"); + } + } + + /** + * @return the {@link CacheIndexWriter} to use for the given {@link CacheFile} + */ + private CacheIndexWriter getWriter(CacheFile cacheFile) { + ensureOpen(); + if (writers.size() == 1) { + return writers.get(0); + } else { + final Path path = cacheFile.getFile().toAbsolutePath(); + return writers.stream() + .filter(writer -> path.startsWith(writer.nodePath().path)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Failed to find a Lucene index for cache file path [" + path + ']')); + } + } + + public void addCacheFile(CacheFile cacheFile, SortedSet> ranges) throws IOException { + ensureStarted(); + getWriter(cacheFile).updateCacheFile(cacheFile, ranges); + } + + public void removeCacheFile(CacheFile cacheFile) throws IOException { + ensureStarted(); + getWriter(cacheFile).deleteCacheFile(cacheFile); + } + + /** + * This method repopulates the {@link CacheService} by looking at the files on the disk and for each file found, retrieves the latest + * synchronized information and puts the cache file into the searchable snapshots cache. + * + * This method iterates over all node data paths and all shard directories in order to found the "snapshot_cache" directories that + * contain the cache files. When such a directory is found, the method iterates over the cache files and looks up their name/UUID in + * the existing Lucene documents that were loaded when instanciating the persistent cache index). If no information is found (ie no + * matching docs in the map of Lucene documents) then the file is deleted from disk. If a doc is found the stored fields are extracted + * from the Lucene document and are used to rebuild the necessary {@link CacheKey}, {@link SnapshotId}, {@link IndexId}, {@link ShardId} + * and cache file ranges objects. The Lucene document is then indexed again in the new persistent cache index (the current + * {@link CacheIndexWriter}) and the cache file is added back to the searchable snapshots cache again. Note that adding cache + * file to the cache service might trigger evictions so previously reindexed Lucene cache files might be delete again (see + * CacheService#onCacheFileRemoval(CacheFile) method which calls {@link #removeCacheFile(CacheFile)}. + * + * @param cacheService the {@link CacheService} to use when repopulating {@link CacheFile}. + */ + void repopulateCache(CacheService cacheService) { + ensureOpen(); + if (started.compareAndSet(false, true)) { + try { + for (CacheIndexWriter writer : writers) { + final NodeEnvironment.NodePath nodePath = writer.nodePath(); + logger.debug("loading persistent cache on data path [{}]", nodePath); + + for (String indexUUID : nodeEnvironment.availableIndexFoldersForPath(nodePath)) { + for (ShardId shardId : nodeEnvironment.findAllShardIds(new Index("_unknown_", indexUUID))) { + final Path shardDataPath = writer.nodePath().resolve(shardId); + final Path shardCachePath = getShardCachePath(new ShardPath(false, shardDataPath, shardDataPath, shardId)); + + if (Files.isDirectory(shardCachePath)) { + logger.trace("found snapshot cache dir at [{}], loading cache files from disk and index", shardCachePath); + Files.walkFileTree(shardCachePath, new SimpleFileVisitor<>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + try { + final String id = buildId(file); + final Document cacheDocument = documents.get(id); + if (cacheDocument != null) { + logger.trace("indexing cache file with id [{}] in persistent cache index", id); + writer.updateCacheFile(id, cacheDocument); + + final CacheKey cacheKey = buildCacheKey(cacheDocument); + final long fileLength = getFileLength(cacheDocument); + final SortedSet> ranges = buildCacheFileRanges(cacheDocument); + + logger.trace("adding cache file with [id={}, key={}, ranges={}]", id, cacheKey, ranges); + cacheService.put(cacheKey, fileLength, file.getParent(), id, ranges); + } else { + logger.trace("deleting cache file [{}] (does not exist in persistent cache index)", file); + Files.delete(file); + } + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + return FileVisitResult.CONTINUE; + } + }); + } + } + } + } + for (CacheIndexWriter writer : writers) { + writer.commit(); + } + logger.info("persistent cache index loaded"); + documents.clear(); + } catch (IOException e) { + try { + close(); + } catch (Exception e2) { + logger.warn("failed to close persistent cache index", e2); + e.addSuppressed(e2); + } + throw new UncheckedIOException("Failed to load persistent cache", e); + } finally { + closeIfAnyIndexWriterHasTragedyOrIsClosed(); + } + } else { + assert false : "persistent cache is already loaded"; + } + } + + void commit() throws IOException { + ensureOpen(); + try { + for (CacheIndexWriter writer : writers) { + writer.prepareCommit(); + } + for (CacheIndexWriter writer : writers) { + writer.commit(); + } + } catch (IOException e) { + try { + close(); + } catch (Exception e2) { + logger.warn("failed to close persistent cache index writer", e2); + e.addSuppressed(e2); + } + throw e; + } finally { + closeIfAnyIndexWriterHasTragedyOrIsClosed(); + } + } + + private void closeIfAnyIndexWriterHasTragedyOrIsClosed() { + if (writers.stream().map(writer -> writer.indexWriter).anyMatch(iw -> iw.getTragicException() != null || iw.isOpen() == false)) { + try { + close(); + } catch (Exception e) { + logger.warn("failed to close persistent cache index", e); + } + } + } + + public boolean hasDeletions() { + ensureOpen(); + for (CacheIndexWriter writer : writers) { + if (writer.indexWriter.hasDeletions()) { + return true; + } + } + return false; + } + + public long getNumDocs() { + ensureOpen(); + long count = 0L; + for (CacheIndexWriter writer : writers) { + count += writer.indexWriter.getPendingNumDocs(); + } + return count; + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + try { + IOUtils.close(writers); + } finally { + documents.clear(); + } + } + } + + /** + * Creates a list of {@link CacheIndexWriter}, one for each data path of the specified {@link NodeEnvironment}. + * + * @param nodeEnvironment the data node environment + * @return a list of {@link CacheIndexWriter} + */ + private static List createWriters(NodeEnvironment nodeEnvironment) { + final List writers = new ArrayList<>(); + boolean success = false; + try { + final NodeEnvironment.NodePath[] nodePaths = nodeEnvironment.nodePaths(); + for (NodeEnvironment.NodePath nodePath : nodePaths) { + writers.add(createCacheIndexWriter(nodePath)); + } + success = true; + } catch (IOException e) { + throw new UncheckedIOException("Failed to create persistent cache writers", e); + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(writers); + } + } + return unmodifiableList(writers); + } + + /** + * Creates a new {@link CacheIndexWriter} for the specified data path. The is a single instance per data path. + * + * @param nodePath the data path + * @return a new {@link CacheIndexWriter} instance + * @throws IOException if something went wrong + */ + static CacheIndexWriter createCacheIndexWriter(NodeEnvironment.NodePath nodePath) throws IOException { + final List closeables = new ArrayList<>(); + boolean success = false; + try { + Path directoryPath = createCacheIndexFolder(nodePath); + final Directory directory = FSDirectory.open(directoryPath); + closeables.add(directory); + + final IndexWriterConfig config = new IndexWriterConfig(new KeywordAnalyzer()); + config.setIndexDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); + config.setOpenMode(IndexWriterConfig.OpenMode.CREATE); + config.setMergeScheduler(new SerialMergeScheduler()); + config.setRAMBufferSizeMB(1.0); + config.setCommitOnClose(false); + + final IndexWriter indexWriter = new IndexWriter(directory, config); + closeables.add(indexWriter); + + final CacheIndexWriter cacheIndexWriter = new CacheIndexWriter(nodePath, directory, indexWriter); + success = true; + return cacheIndexWriter; + } finally { + if (success == false) { + IOUtils.close(closeables); + } + } + } + + /** + * Load existing documents from persistent cache indices located at the root of every node path. + * + * @param nodeEnvironment the data node environment + * @return a map of {cache file uuid, Lucene document} + */ + static Map loadDocuments(NodeEnvironment nodeEnvironment) { + final Map documents = new HashMap<>(); + try { + for (NodeEnvironment.NodePath nodePath : nodeEnvironment.nodePaths()) { + final Path directoryPath = resolveCacheIndexFolder(nodePath); + if (Files.exists(directoryPath)) { + documents.putAll(loadDocuments(directoryPath)); + } + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to load existing documents from persistent cache index", e); + } + return documents; + } + + /** + * Load existing documents from a persistent cache Lucene directory. + * + * @param directoryPath the Lucene directory path + * @return a map of {cache file uuid, Lucene document} + */ + static Map loadDocuments(Path directoryPath) throws IOException { + final Map documents = new HashMap<>(); + try (Directory directory = FSDirectory.open(directoryPath)) { + try (IndexReader indexReader = DirectoryReader.open(directory)) { + logger.trace("loading documents from persistent cache index [{}]", directoryPath); + for (LeafReaderContext leafReaderContext : indexReader.leaves()) { + final LeafReader leafReader = leafReaderContext.reader(); + final Bits liveDocs = leafReader.getLiveDocs(); + for (int i = 0; i < leafReader.maxDoc(); i++) { + if (liveDocs == null || liveDocs.get(i)) { + final Document document = leafReader.document(i); + logger.trace("loading document [{}]", document); + documents.put(getValue(document, CACHE_ID_FIELD), document); + } + } + } + } catch (IndexNotFoundException e) { + logger.debug("persistent cache index does not exist yet", e); + } + } + return documents; + } + + /** + * Cleans any leftover searchable snapshot caches (files and Lucene indices) when a non-data node is starting up. + * This is useful when the node is repurposed and is not a data node anymore. + * + * @param nodeEnvironment the {@link NodeEnvironment} to cleanup + */ + public static void cleanUp(Settings settings, NodeEnvironment nodeEnvironment) { + final boolean isDataNode = DiscoveryNode.isDataNode(settings); + if (isDataNode) { + assert false : "should not be called on data nodes"; + throw new IllegalStateException("Cannot clean searchable snapshot caches: node is a data node"); + } + try { + for (NodeEnvironment.NodePath nodePath : nodeEnvironment.nodePaths()) { + for (String indexUUID : nodeEnvironment.availableIndexFoldersForPath(nodePath)) { + for (ShardId shardId : nodeEnvironment.findAllShardIds(new Index("_unknown_", indexUUID))) { + final Path shardDataPath = nodePath.resolve(shardId); + final ShardPath shardPath = new ShardPath(false, shardDataPath, shardDataPath, shardId); + final Path cacheDir = getShardCachePath(shardPath); + if (Files.isDirectory(cacheDir)) { + logger.debug("deleting searchable snapshot shard cache directory [{}]", cacheDir); + IOUtils.rm(cacheDir); + } + } + } + final Path cacheIndexDir = resolveCacheIndexFolder(nodePath); + if (Files.isDirectory(cacheIndexDir)) { + logger.debug("deleting searchable snapshot lucene directory [{}]", cacheIndexDir); + IOUtils.rm(cacheIndexDir); + } + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to clean up searchable snapshots cache", e); + } + } + + /** + * A {@link CacheIndexWriter} contains a Lucene {@link Directory} with an {@link IndexWriter} that can be used to index documents in + * the persistent cache index. There is one {@link CacheIndexWriter} for each data path. + */ + static class CacheIndexWriter implements Closeable { + + private final NodeEnvironment.NodePath nodePath; + private final IndexWriter indexWriter; + private final Directory directory; + + private CacheIndexWriter(NodeEnvironment.NodePath nodePath, Directory directory, IndexWriter indexWriter) { + this.nodePath = nodePath; + this.directory = directory; + this.indexWriter = indexWriter; + } + + NodeEnvironment.NodePath nodePath() { + return nodePath; + } + + void updateCacheFile(CacheFile cacheFile, SortedSet> cacheRanges) throws IOException { + updateCacheFile(buildId(cacheFile), buildDocument(nodePath, cacheFile, cacheRanges)); + } + + void updateCacheFile(String cacheFileId, Document cacheFileDocument) throws IOException { + final Term term = buildTerm(cacheFileId); + logger.debug("updating document with term [{}]", term); + indexWriter.updateDocument(term, cacheFileDocument); + } + + void deleteCacheFile(CacheFile cacheFile) throws IOException { + deleteCacheFile(buildId(cacheFile)); + } + + void deleteCacheFile(String cacheFileId) throws IOException { + final Term term = buildTerm(cacheFileId); + logger.debug("deleting document with term [{}]", term); + indexWriter.deleteDocuments(term); + } + + void prepareCommit() throws IOException { + logger.debug("preparing commit"); + final Map commitData = new HashMap<>(1); + commitData.put(NODE_VERSION_COMMIT_KEY, Integer.toString(Version.CURRENT.id)); + indexWriter.setLiveCommitData(commitData.entrySet()); + indexWriter.prepareCommit(); + } + + void commit() throws IOException { + logger.debug("committing"); + indexWriter.commit(); + } + + @Override + public void close() throws IOException { + logger.debug("closing persistent cache index"); + IOUtils.close(indexWriter, directory); + } + + @Override + public String toString() { + return "[persistent cache index][" + nodePath + ']'; + } + } + + private static final String CACHE_ID_FIELD = "cache_id"; + private static final String CACHE_PATH_FIELD = "cache_path"; + private static final String CACHE_RANGES_FIELD = "cache_ranges"; + private static final String SNAPSHOT_ID_FIELD = "snapshot_id"; + private static final String SNAPSHOT_NAME_FIELD = "snapshot_name"; + private static final String INDEX_ID_FIELD = "index_id"; + private static final String INDEX_NAME_FIELD = "index_name"; + private static final String SHARD_INDEX_NAME_FIELD = "shard_index_name"; + private static final String SHARD_INDEX_ID_FIELD = "shard_index_id"; + private static final String SHARD_ID_FIELD = "shard_id"; + private static final String FILE_NAME_FIELD = "file_name"; + private static final String FILE_LENGTH_FIELD = "file_length"; + + private static String buildId(CacheFile cacheFile) { + return buildId(cacheFile.getFile()); + } + + private static String buildId(Path path) { + return path.getFileName().toString(); + } + + private static Term buildTerm(CacheFile cacheFile) { + return buildTerm(buildId(cacheFile)); + } + + private static Term buildTerm(String cacheFileUuid) { + return new Term(CACHE_ID_FIELD, cacheFileUuid); + } + + private static Document buildDocument(NodeEnvironment.NodePath nodePath, CacheFile cacheFile, SortedSet> cacheRanges) + throws IOException { + final Document document = new Document(); + document.add(new StringField(CACHE_ID_FIELD, buildId(cacheFile), Field.Store.YES)); + document.add(new StringField(CACHE_PATH_FIELD, nodePath.indicesPath.relativize(cacheFile.getFile()).toString(), Field.Store.YES)); + + try (BytesStreamOutput output = new BytesStreamOutput()) { + output.writeVInt(cacheRanges.size()); + for (Tuple cacheRange : cacheRanges) { + output.writeVLong(cacheRange.v1()); + output.writeVLong(cacheRange.v2()); + } + output.flush(); + document.add(new StoredField(CACHE_RANGES_FIELD, output.bytes().toBytesRef())); + } + + final CacheKey cacheKey = cacheFile.getCacheKey(); + document.add(new StringField(FILE_NAME_FIELD, cacheKey.getFileName(), Field.Store.YES)); + document.add(new StringField(FILE_LENGTH_FIELD, Long.toString(cacheFile.getLength()), Field.Store.YES)); + + final SnapshotId snapshotId = cacheKey.getSnapshotId(); + document.add(new StringField(SNAPSHOT_NAME_FIELD, snapshotId.getName(), Field.Store.YES)); + document.add(new StringField(SNAPSHOT_ID_FIELD, snapshotId.getUUID(), Field.Store.YES)); + + final IndexId indexId = cacheKey.getIndexId(); + document.add(new StringField(INDEX_NAME_FIELD, indexId.getName(), Field.Store.YES)); + document.add(new StringField(INDEX_ID_FIELD, indexId.getId(), Field.Store.YES)); + + final ShardId shardId = cacheKey.getShardId(); + document.add(new StringField(SHARD_INDEX_NAME_FIELD, shardId.getIndex().getName(), Field.Store.YES)); + document.add(new StringField(SHARD_INDEX_ID_FIELD, shardId.getIndex().getUUID(), Field.Store.YES)); + document.add(new StringField(SHARD_ID_FIELD, Integer.toString(shardId.getId()), Field.Store.YES)); + + return document; + } + + private static String getValue(Document document, String fieldName) { + final String value = document.get(fieldName); + assert value != null : "no value found for field [" + fieldName + "] and document [" + document + ']'; + return value; + } + + private static CacheKey buildCacheKey(Document document) { + return new CacheKey( + new SnapshotId(getValue(document, SNAPSHOT_NAME_FIELD), getValue(document, SNAPSHOT_ID_FIELD)), + new IndexId(getValue(document, INDEX_NAME_FIELD), getValue(document, INDEX_ID_FIELD)), + new ShardId( + new Index(getValue(document, SHARD_INDEX_NAME_FIELD), getValue(document, SHARD_INDEX_ID_FIELD)), + Integer.parseInt(getValue(document, SHARD_ID_FIELD)) + ), + getValue(document, FILE_NAME_FIELD) + ); + } + + private static long getFileLength(Document document) { + final String fileLength = getValue(document, FILE_LENGTH_FIELD); + assert fileLength != null; + return Long.parseLong(fileLength); + } + + private static SortedSet> buildCacheFileRanges(Document document) throws IOException { + final BytesRef cacheRangesBytesRef = document.getBinaryValue(CACHE_RANGES_FIELD); + assert cacheRangesBytesRef != null; + + final SortedSet> cacheRanges = new TreeSet<>(Comparator.comparingLong(Tuple::v1)); + try (StreamInput input = new ByteBufferStreamInput(ByteBuffer.wrap(cacheRangesBytesRef.bytes))) { + final int length = input.readVInt(); + assert length > 0 : "empty cache ranges"; + Tuple previous = null; + for (int i = 0; i < length; i++) { + final Tuple range = Tuple.tuple(input.readVLong(), input.readVLong()); + assert range.v1() < range.v2() : range; + assert range.v2() <= getFileLength(document); + assert previous == null || previous.v2() < range.v1(); + + final boolean added = cacheRanges.add(range); + assert added : range + " already exist in " + cacheRanges; + previous = range; + } + } + return unmodifiableSortedSet(cacheRanges); + } + + static Path resolveCacheIndexFolder(NodeEnvironment.NodePath nodePath) { + return resolveCacheIndexFolder(nodePath.path); + } + + static Path resolveCacheIndexFolder(Path dataPath) { + return CacheService.resolveSnapshotCache(dataPath); + } + + /** + * Creates a directory for the snapshot cache Lucene index. + */ + private static Path createCacheIndexFolder(NodeEnvironment.NodePath nodePath) throws IOException { + // "snapshot_cache" directory at the root of the specified data path + final Path snapshotCacheRootDir = resolveCacheIndexFolder(nodePath); + if (Files.exists(snapshotCacheRootDir) == false) { + logger.debug("creating new persistent cache index directory [{}]", snapshotCacheRootDir); + Files.createDirectories(snapshotCacheRootDir); + } + return snapshotCacheRootDir; + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java index fb57bc3cd53a3..2a521ec1e3e4d 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; @@ -38,8 +37,8 @@ import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; import java.util.List; @@ -52,6 +51,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; +import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.resolveSnapshotCache; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -604,14 +604,9 @@ private void executeTestCase( final StoreFileMetadata metadata = new StoreFileMetadata(fileName, fileContent.length, "_checksum", Version.CURRENT.luceneVersion); final List files = Collections.singletonList(new FileInfo(blobName, metadata, new ByteSizeValue(fileContent.length))); final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), 0L, files, 0L, 0L, 0, 0L); - final Path shardDir; - try { - shardDir = new NodeEnvironment.NodePath(createTempDir()).resolve(shardId); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + final Path shardDir = randomShardPath(shardId); final ShardPath shardPath = new ShardPath(false, shardDir, shardDir, shardId); - final Path cacheDir = createTempDir(); + final Path cacheDir = Files.createDirectories(resolveSnapshotCache(shardDir).resolve(snapshotId.getUUID())); try ( CacheService ignored = cacheService; diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index 4ec11b9c9ebac..b7dbd77873577 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -126,6 +126,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; +import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.resolveSnapshotCache; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -685,14 +686,9 @@ public void testClearCache() throws Exception { final IndexId indexId = new IndexId("_id", "_uuid"); final ShardId shardId = new ShardId(new Index("_name", "_id"), 0); - final Path shardDir; - try { - shardDir = new NodeEnvironment.NodePath(createTempDir()).resolve(shardId); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + final Path shardDir = randomShardPath(shardId); final ShardPath shardPath = new ShardPath(false, shardDir, shardDir, shardId); - final Path cacheDir = createTempDir(); + final Path cacheDir = Files.createDirectories(resolveSnapshotCache(shardDir).resolve(snapshotId.getUUID())); try ( SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory( () -> blobContainer, diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java index 393cd0aeb3178..65db1837587b0 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java @@ -214,7 +214,7 @@ public void testFSync() throws Exception { final AtomicBoolean needsFSyncCalled = new AtomicBoolean(); final CacheFile cacheFile = new CacheFile( CACHE_KEY, - randomLongBetween(100, 1000), + randomLongBetween(0L, 1000L), fileSystem.resolve("test"), () -> assertFalse(needsFSyncCalled.getAndSet(true)) ); @@ -261,7 +261,7 @@ public void testFSyncOnEvictedFile() throws Exception { final AtomicBoolean needsFSyncCalled = new AtomicBoolean(); final CacheFile cacheFile = new CacheFile( CACHE_KEY, - randomLongBetween(1L, 1000L), + randomLongBetween(0L, 1000L), fileSystem.resolve("test"), () -> assertFalse(needsFSyncCalled.getAndSet(true)) ); @@ -315,7 +315,7 @@ public void testFSyncFailure() throws Exception { final AtomicBoolean needsFSyncCalled = new AtomicBoolean(); final CacheFile cacheFile = new CacheFile( CACHE_KEY, - randomLongBetween(1L, 1000L), + randomLongBetween(0L, 1000L), fileSystem.resolve("test"), () -> assertFalse(needsFSyncCalled.getAndSet(true)) ); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index 8d99956b03602..65a4b953d8c6e 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -14,6 +14,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; import org.elasticsearch.common.blobstore.BlobPath; @@ -64,6 +65,10 @@ public final class TestUtils { private TestUtils() {} public static SortedSet> randomPopulateAndReads(final CacheFile cacheFile) { + return randomPopulateAndReads(cacheFile, (fileChannel, aLong, aLong2) -> {}); + } + + public static SortedSet> randomPopulateAndReads(CacheFile cacheFile, TriConsumer consumer) { final SortedSet> ranges = synchronizedNavigableSet(new TreeSet<>(Comparator.comparingLong(Tuple::v1))); final List> futures = new ArrayList<>(); final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( @@ -71,11 +76,12 @@ public static SortedSet> randomPopulateAndReads(final CacheFil random() ); for (int i = 0; i < between(0, 10); i++) { - final long start = randomLongBetween(0L, cacheFile.getLength() - 1L); - final long end = randomLongBetween(start + 1L, cacheFile.getLength()); + final long start = randomLongBetween(0L, Math.max(0L, cacheFile.getLength() - 1L)); + final long end = randomLongBetween(Math.min(start + 1L, cacheFile.getLength()), cacheFile.getLength()); final Tuple range = Tuple.tuple(start, end); futures.add( cacheFile.populateAndRead(range, range, channel -> Math.toIntExact(end - start), (channel, from, to, progressUpdater) -> { + consumer.apply(channel, from, to); ranges.add(Tuple.tuple(from, to)); progressUpdater.accept(to); }, deterministicTaskQueue.getThreadPool().generic()) @@ -144,6 +150,13 @@ public static SortedSet> mergeContiguousRanges(final SortedSet }); } + public static void assertCacheFileEquals(CacheFile expected, CacheFile actual) { + assertThat(actual.getLength(), equalTo(expected.getLength())); + assertThat(actual.getFile(), equalTo(expected.getFile())); + assertThat(actual.getCacheKey(), equalTo(expected.getCacheKey())); + assertThat(actual.getCompletedRanges(), equalTo(expected.getCompletedRanges())); + } + public static void assertCounter(IndexInputStats.Counter counter, long total, long count, long min, long max) { assertThat(counter.total(), equalTo(total)); assertThat(counter.count(), equalTo(count)); @@ -350,7 +363,7 @@ public Integer getNumberOfFSyncs(Path path) { @Override public FileChannel newFileChannel(Path path, Set options, FileAttribute... attrs) throws IOException { final AtomicInteger counter = files.computeIfAbsent(path, p -> new AtomicInteger(0)); - return new FilterFileChannel(delegate.newFileChannel(toDelegate(path), options, attrs)) { + return new FilterFileChannel(super.newFileChannel(path, options, attrs)) { @Override public void force(boolean metaData) throws IOException { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java index 37b9db47f28b1..56a7ff3a6c19b 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java @@ -36,9 +36,11 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolStats; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; +import org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache; import org.junit.After; import org.junit.Before; +import java.nio.file.Path; import java.util.Collections; import java.util.concurrent.TimeUnit; @@ -85,7 +87,7 @@ public void tearDownTest() throws Exception { * @return a new {@link CacheService} instance configured with default settings */ protected CacheService defaultCacheService() { - return new CacheService(Settings.EMPTY, clusterService, threadPool, AbstractSearchableSnapshotsTestCase::noOpCacheCleaner); + return new CacheService(Settings.EMPTY, clusterService, threadPool, new PersistentCache(nodeEnvironment)); } /** @@ -105,7 +107,7 @@ protected CacheService randomCacheService() { TimeValue.timeValueSeconds(scaledRandomIntBetween(1, 120)) ); } - return new CacheService(cacheSettings.build(), clusterService, threadPool, AbstractSearchableSnapshotsTestCase::noOpCacheCleaner); + return new CacheService(cacheSettings.build(), clusterService, threadPool, new PersistentCache(nodeEnvironment)); } /** @@ -119,11 +121,16 @@ protected CacheService createCacheService(final ByteSizeValue cacheSize, final B .build(), clusterService, threadPool, - AbstractSearchableSnapshotsTestCase::noOpCacheCleaner + new PersistentCache(nodeEnvironment) ); } - protected static void noOpCacheCleaner() {} + /** + * Returns a random shard data path for the specified {@link ShardId}. The returned path can be located on any of the data node paths. + */ + protected Path randomShardPath(ShardId shardId) { + return randomFrom(nodeEnvironment.availableShardPaths(shardId)); + } /** * @return a random {@link ByteSizeValue} that can be used to set {@link CacheService#SNAPSHOT_CACHE_SIZE_SETTING}. diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java index 8a1fc0847df62..6e42b6a1abf05 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.searchablesnapshots.cache; +import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; @@ -36,12 +37,14 @@ import static java.util.Collections.emptySortedSet; import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; import static org.elasticsearch.index.store.cache.TestUtils.randomRanges; +import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.resolveSnapshotCache; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +@LuceneTestCase.SuppressFileSystems("ExtrasFS") // we don't want extra empty dirs in snapshot cache root dirs public class CacheServiceTests extends AbstractSearchableSnapshotsTestCase { private static FSyncTrackingFileSystemProvider fileSystemProvider; @@ -67,11 +70,11 @@ public void testCacheSynchronization() throws Exception { logger.debug("--> creating shard cache directories on disk"); final Path[] shardsCacheDirs = new Path[numShards]; for (int i = 0; i < numShards; i++) { - final Path shardDataPath = randomFrom(nodeEnvironment.availableShardPaths(new ShardId(index, i))); + final Path shardDataPath = randomShardPath(new ShardId(index, i)); assertFalse(Files.exists(shardDataPath)); logger.debug("--> creating directories [{}] for shard [{}]", shardDataPath.toAbsolutePath(), i); - shardsCacheDirs[i] = Files.createDirectories(CacheService.resolveSnapshotCache(shardDataPath).resolve(snapshotId.getUUID())); + shardsCacheDirs[i] = Files.createDirectories(resolveSnapshotCache(shardDataPath).resolve(snapshotId.getUUID())); } try (CacheService cacheService = defaultCacheService()) { @@ -108,7 +111,7 @@ public void testCacheSynchronization() throws Exception { final ShardId shardId = new ShardId(index, randomIntBetween(0, numShards - 1)); final String fileName = String.format(Locale.ROOT, "file_%d_%d", iteration, i); final CacheKey cacheKey = new CacheKey(snapshotId, indexId, shardId, fileName); - final CacheFile cacheFile = cacheService.get(cacheKey, randomIntBetween(1, 10_000), shardsCacheDirs[shardId.id()]); + final CacheFile cacheFile = cacheService.get(cacheKey, randomIntBetween(0, 10_000), shardsCacheDirs[shardId.id()]); final CacheFile.EvictionListener listener = evictedCacheFile -> {}; cacheFile.acquire(listener); @@ -172,7 +175,6 @@ public void testCacheSynchronization() throws Exception { } public void testPut() throws Exception { - final Path cacheDir = createTempDir(); try (CacheService cacheService = defaultCacheService()) { final long fileLength = randomLongBetween(0L, 1000L); final CacheKey cacheKey = new CacheKey( @@ -181,6 +183,10 @@ public void testPut() throws Exception { new ShardId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random()), randomInt(5)), randomAlphaOfLength(105).toLowerCase(Locale.ROOT) ); + + final Path cacheDir = Files.createDirectories( + resolveSnapshotCache(randomShardPath(cacheKey.getShardId())).resolve(cacheKey.getSnapshotId().getUUID()) + ); final String cacheFileUuid = UUIDs.randomBase64UUID(random()); final SortedSet> cacheFileRanges = randomBoolean() ? randomRanges(fileLength) : emptySortedSet(); @@ -205,6 +211,7 @@ public void testPut() throws Exception { FileNotFoundException.class, () -> cacheService.put(cacheKey, fileLength, cacheDir, cacheFileUuid, cacheFileRanges) ); + cacheService.start(); assertThat(exception.getMessage(), containsString(cacheFileUuid)); } } @@ -214,6 +221,7 @@ public void testRunIfShardMarkedAsEvictedInCache() throws Exception { final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); final IndexId indexId = new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); final ShardId shardId = new ShardId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random()), 0); + final Path cacheDir = Files.createDirectories(resolveSnapshotCache(randomShardPath(shardId)).resolve(snapshotId.getUUID())); final CacheService cacheService = defaultCacheService(); cacheService.setCacheSyncInterval(TimeValue.ZERO); @@ -230,7 +238,7 @@ public void testRunIfShardMarkedAsEvictedInCache() throws Exception { final PlainActionFuture waitForEviction = PlainActionFuture.newFuture(); final CacheFile.EvictionListener evictionListener = evicted -> waitForEviction.onResponse(null); - final CacheFile cacheFile = cacheService.get(new CacheKey(snapshotId, indexId, shardId, "_0.dvd"), 100, createTempDir()); + final CacheFile cacheFile = cacheService.get(new CacheKey(snapshotId, indexId, shardId, "_0.dvd"), 100, cacheDir); cacheFile.acquire(evictionListener); cacheService.markShardAsEvictedInCache(snapshotId, indexId, shardId); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java new file mode 100644 index 0000000000000..378d9f9bb83ec --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java @@ -0,0 +1,239 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.searchablesnapshots.cache; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.cache.CacheFile; +import org.elasticsearch.index.store.cache.CacheKey; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.stream.Collectors; + +import static org.elasticsearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES; +import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_ROLE; +import static org.elasticsearch.index.store.cache.TestUtils.assertCacheFileEquals; +import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; +import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.createCacheIndexWriter; +import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.resolveCacheIndexFolder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class PersistentCacheTests extends AbstractSearchableSnapshotsTestCase { + + public void testCacheIndexWriter() throws Exception { + final NodeEnvironment.NodePath nodePath = randomFrom(nodeEnvironment.nodePaths()); + + int docId = 0; + final Map liveDocs = new HashMap<>(); + final Set deletedDocs = new HashSet<>(); + + for (int iter = 0; iter < 20; iter++) { + + final Path snapshotCacheIndexDir = resolveCacheIndexFolder(nodePath); + assertThat(Files.exists(snapshotCacheIndexDir), equalTo(iter > 0)); + + // load existing documents from persistent cache index before each iteration + final Map documents = PersistentCache.loadDocuments(nodeEnvironment); + assertThat(documents.size(), equalTo(liveDocs.size())); + + try (PersistentCache.CacheIndexWriter writer = createCacheIndexWriter(nodePath)) { + assertThat(writer.nodePath(), sameInstance(nodePath)); + + // verify that existing documents are loaded + for (Map.Entry liveDoc : liveDocs.entrySet()) { + final Document document = documents.get(liveDoc.getKey()); + assertThat("Document should be loaded", document, notNullValue()); + final String iteration = document.get("update_iteration"); + assertThat(iteration, equalTo(String.valueOf(liveDoc.getValue()))); + writer.updateCacheFile(liveDoc.getKey(), document); + } + + // verify that deleted documents are not loaded + for (String deletedDoc : deletedDocs) { + final Document document = documents.get(deletedDoc); + assertThat("Document should not be loaded", document, nullValue()); + } + + // random updates of existing documents + final Map updatedDocs = new HashMap<>(); + for (String cacheId : randomSubsetOf(liveDocs.keySet())) { + final Document document = new Document(); + document.add(new StringField("cache_id", cacheId, Field.Store.YES)); + document.add(new StringField("update_iteration", String.valueOf(iter), Field.Store.YES)); + writer.updateCacheFile(cacheId, document); + + updatedDocs.put(cacheId, iter); + } + + // create new random documents + final Map newDocs = new HashMap<>(); + for (int i = 0; i < between(1, 10); i++) { + final String cacheId = String.valueOf(docId++); + final Document document = new Document(); + document.add(new StringField("cache_id", cacheId, Field.Store.YES)); + document.add(new StringField("update_iteration", String.valueOf(iter), Field.Store.YES)); + writer.updateCacheFile(cacheId, document); + + newDocs.put(cacheId, iter); + } + + // deletes random documents + final Map removedDocs = new HashMap<>(); + for (String cacheId : randomSubsetOf(Sets.union(liveDocs.keySet(), newDocs.keySet()))) { + writer.deleteCacheFile(cacheId); + + removedDocs.put(cacheId, iter); + } + + boolean commit = false; + if (frequently()) { + writer.prepareCommit(); + if (frequently()) { + writer.commit(); + commit = true; + } + } + + if (commit) { + liveDocs.putAll(updatedDocs); + liveDocs.putAll(newDocs); + for (String cacheId : removedDocs.keySet()) { + liveDocs.remove(cacheId); + deletedDocs.add(cacheId); + } + } + } + } + } + + public void testRepopulateCache() throws Exception { + final CacheService cacheService = defaultCacheService(); + cacheService.setCacheSyncInterval(TimeValue.ZERO); + cacheService.start(); + + final List cacheFiles = generateRandomCacheFiles(cacheService); + cacheService.synchronizeCache(); + + if (cacheFiles.isEmpty() == false) { + final List removedCacheFiles = randomSubsetOf(cacheFiles); + for (CacheFile removedCacheFile : removedCacheFiles) { + if (randomBoolean()) { + // evict cache file from the cache + cacheService.removeFromCache(removedCacheFile.getCacheKey()); + } else { + IOUtils.rm(removedCacheFile.getFile()); + } + cacheFiles.remove(removedCacheFile); + } + } + cacheService.stop(); + + final CacheService newCacheService = defaultCacheService(); + newCacheService.start(); + for (CacheFile cacheFile : cacheFiles) { + CacheFile newCacheFile = newCacheService.get(cacheFile.getCacheKey(), cacheFile.getLength(), cacheFile.getFile().getParent()); + assertThat(newCacheFile, notNullValue()); + assertThat(newCacheFile, not(sameInstance(cacheFile))); + assertCacheFileEquals(newCacheFile, cacheFile); + } + newCacheService.stop(); + } + + public void testCleanUp() throws Exception { + final List cacheFiles; + try (CacheService cacheService = defaultCacheService()) { + cacheService.start(); + cacheFiles = generateRandomCacheFiles(cacheService).stream().map(CacheFile::getFile).collect(Collectors.toList()); + if (randomBoolean()) { + cacheService.synchronizeCache(); + } + } + + final Settings nodeSettings = Settings.builder() + .put(NODE_ROLES_SETTING.getKey(), randomValueOtherThan(DATA_ROLE, () -> randomFrom(BUILT_IN_ROLES)).roleName()) + .build(); + + assertTrue(cacheFiles.stream().allMatch(Files::exists)); + PersistentCache.cleanUp(nodeSettings, nodeEnvironment); + assertTrue(cacheFiles.stream().noneMatch(Files::exists)); + } + + /** + * Generates 1 or more cache files using the specified {@link CacheService}. + */ + private List generateRandomCacheFiles(CacheService cacheService) throws Exception { + final byte[] buffer = new byte[1024]; + Arrays.fill(buffer, (byte) 0xff); + + final List cacheFiles = new ArrayList<>(); + for (int snapshots = 0; snapshots < between(1, 2); snapshots++) { + SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); + for (int indices = 0; indices < between(1, 2); indices++) { + IndexId indexId = new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); + for (int shards = 0; shards < between(1, 2); shards++) { + ShardId shardId = new ShardId(indexId.getName(), indexId.getId(), shards); + + final Path cacheDir = Files.createDirectories( + CacheService.resolveSnapshotCache(randomShardPath(shardId)).resolve(snapshotId.getUUID()) + ); + + for (int files = 0; files < between(1, 2); files++) { + final CacheKey cacheKey = new CacheKey(snapshotId, indexId, shardId, "file_" + files); + final CacheFile cacheFile = cacheService.get(cacheKey, randomLongBetween(0L, buffer.length), cacheDir); + + final CacheFile.EvictionListener listener = evictedCacheFile -> {}; + cacheFile.acquire(listener); + try { + SortedSet> ranges = randomPopulateAndReads(cacheFile, (channel, from, to) -> { + try { + channel.write(ByteBuffer.wrap(buffer, Math.toIntExact(from), Math.toIntExact(to))); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + if (ranges.isEmpty() == false) { + cacheFiles.add(cacheFile); + } + } finally { + cacheFile.release(listener); + } + } + } + } + } + return cacheFiles; + } +} From 671a78ec107c8114277e70319f5c4e0dcbdbbb0e Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 14 Dec 2020 18:17:51 +0100 Subject: [PATCH 2/2] fix --- .../xpack/searchablesnapshots/cache/PersistentCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java index b656a2b40979a..21363b3094e14 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java @@ -158,7 +158,7 @@ void repopulateCache(CacheService cacheService) { if (Files.isDirectory(shardCachePath)) { logger.trace("found snapshot cache dir at [{}], loading cache files from disk and index", shardCachePath); - Files.walkFileTree(shardCachePath, new SimpleFileVisitor<>() { + Files.walkFileTree(shardCachePath, new SimpleFileVisitor() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { try {