diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotRepository.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotRepository.java index 186dfcc9a5199..ec299b36b443b 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotRepository.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotRepository.java @@ -86,7 +86,7 @@ private Directory makeDirectory(IndexSettings indexSettings, ShardPath shardPath Directory directory = new SearchableSnapshotDirectory(snapshot, blobContainer); if (SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings.getSettings())) { final Path cacheDir = shardPath.getDataPath().resolve("snapshots").resolve(snapshotId.getUUID()); - directory = new CacheDirectory(directory, cacheService, cacheDir); + directory = new CacheDirectory(directory, cacheService, cacheDir, snapshotId, indexId, shardPath.getShardId()); } directory = new InMemoryNoOpCommitDirectory(directory); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java index 8ac42411ace59..24088dbc9481c 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java @@ -15,6 +15,9 @@ import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; import java.io.EOFException; import java.io.IOException; @@ -34,19 +37,30 @@ public class CacheDirectory extends FilterDirectory { private static final int COPY_BUFFER_SIZE = 8192; private final CacheService cacheService; + private final SnapshotId snapshotId; + private final IndexId indexId; + private final ShardId shardId; private final Path cacheDir; - public CacheDirectory(Directory in, CacheService cacheService, Path cacheDir) throws IOException { + public CacheDirectory(Directory in, CacheService cacheService, Path cacheDir, SnapshotId snapshotId, IndexId indexId, ShardId shardId) + throws IOException { super(in); this.cacheService = Objects.requireNonNull(cacheService); this.cacheDir = Files.createDirectories(cacheDir); + this.snapshotId = Objects.requireNonNull(snapshotId); + this.indexId = Objects.requireNonNull(indexId); + this.shardId = Objects.requireNonNull(shardId); + } + + private CacheKey createCacheKey(String fileName) { + return new CacheKey(snapshotId, indexId, shardId, fileName); } public void close() throws IOException { super.close(); // Ideally we could let the cache evict/remove cached files by itself after the // directory has been closed. - cacheService.removeFromCache(key -> key.startsWith(cacheDir.toString())); + cacheService.removeFromCache(cacheKey -> cacheKey.belongsTo(snapshotId, indexId, shardId)); } @Override @@ -57,12 +71,12 @@ public IndexInput openInput(final String name, final IOContext context) throws I private class CacheFileReference implements CacheFile.EvictionListener { - private final String fileName; private final long fileLength; + private final CacheKey cacheKey; private final AtomicReference cacheFile = new AtomicReference<>(); // null if evicted or not yet acquired private CacheFileReference(String fileName, long fileLength) { - this.fileName = fileName; + this.cacheKey = createCacheKey(fileName); this.fileLength = fileLength; } @@ -73,7 +87,7 @@ CacheFile get() throws Exception { return currentCacheFile; } - final CacheFile newCacheFile = cacheService.get(fileName, fileLength, cacheDir); + final CacheFile newCacheFile = cacheService.get(cacheKey, fileLength, cacheDir); synchronized (this) { currentCacheFile = cacheFile.get(); if (currentCacheFile != null) { @@ -89,7 +103,7 @@ CacheFile get() throws Exception { } String getFileName() { - return fileName; + return cacheKey.getFileName(); } @Override @@ -113,7 +127,7 @@ void releaseOnClose() { @Override public String toString() { return "CacheFileReference{" + - "fileName='" + fileName + '\'' + + "cacheKey='" + cacheKey + '\'' + ", fileLength=" + fileLength + ", cacheDir=" + cacheDir + ", acquired=" + (cacheFile.get() != null) + diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheFile.java index 016afa8529409..5d418169baf66 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheFile.java @@ -50,7 +50,7 @@ protected void closeInternal() { private final SparseFileTracker tracker; private final int rangeSize; - private final String name; + private final String description; private final Path file; private volatile Set listeners; @@ -59,9 +59,9 @@ protected void closeInternal() { @Nullable // if evicted, or there are no listeners private volatile FileChannel channel; - CacheFile(String name, long length, Path file, int rangeSize) { + CacheFile(String description, long length, Path file, int rangeSize) { this.tracker = new SparseFileTracker(file.toString(), length); - this.name = Objects.requireNonNull(name); + this.description = Objects.requireNonNull(description); this.file = Objects.requireNonNull(file); this.listeners = new HashSet<>(); this.rangeSize = rangeSize; @@ -219,7 +219,7 @@ private boolean invariant() { @Override public String toString() { return "CacheFile{" + - "name='" + name + '\'' + + "desc='" + description + '\'' + ", file=" + file + ", length=" + tracker.getLength() + ", channel=" + (channel != null ? "yes" : "no") + diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheKey.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheKey.java new file mode 100644 index 0000000000000..ebbdc8e72d350 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheKey.java @@ -0,0 +1,79 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots.cache; + +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; + +import java.util.Objects; + +public class CacheKey { + + private final SnapshotId snapshotId; + private final IndexId indexId; + private final ShardId shardId; + private final String fileName; + + CacheKey(SnapshotId snapshotId, IndexId indexId, ShardId shardId, String fileName) { + this.snapshotId = Objects.requireNonNull(snapshotId); + this.indexId = Objects.requireNonNull(indexId); + this.shardId = Objects.requireNonNull(shardId); + this.fileName = Objects.requireNonNull(fileName); + } + + SnapshotId getSnapshotId() { + return snapshotId; + } + + IndexId getIndexId() { + return indexId; + } + + ShardId getShardId() { + return shardId; + } + + String getFileName() { + return fileName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final CacheKey cacheKey = (CacheKey) o; + return Objects.equals(snapshotId, cacheKey.snapshotId) + && Objects.equals(indexId, cacheKey.indexId) + && Objects.equals(shardId, cacheKey.shardId) + && Objects.equals(fileName, cacheKey.fileName); + } + + @Override + public int hashCode() { + return Objects.hash(snapshotId, indexId, shardId, fileName); + } + + @Override + public String toString() { + return "[" + + "snapshotId=" + snapshotId + + ", indexId=" + indexId + + ", shardId=" + shardId + + ", fileName='" + fileName + '\'' + + ']'; + } + + boolean belongsTo(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { + return Objects.equals(this.snapshotId, snapshotId) + && Objects.equals(this.indexId, indexId) + && Objects.equals(this.shardId, shardId); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index dee090f82cb0d..946e3fff254d6 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -40,7 +40,7 @@ public class CacheService extends AbstractLifecycleComponent { new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES), // max Setting.Property.NodeScope); - private final Cache cache; + private final Cache cache; private final ByteSizeValue cacheSize; private final ByteSizeValue rangeSize; @@ -52,7 +52,7 @@ public CacheService(final Settings settings) { CacheService(final ByteSizeValue cacheSize, final ByteSizeValue rangeSize) { this.cacheSize = Objects.requireNonNull(cacheSize); this.rangeSize = Objects.requireNonNull(rangeSize); - this.cache = CacheBuilder.builder() + this.cache = CacheBuilder.builder() .setMaximumWeight(cacheSize.getBytes()) .weigher((key, entry) -> entry.getLength()) // NORELEASE This does not immediately free space on disk, as cache file are only deleted when all index inputs @@ -96,9 +96,9 @@ int getRangeSize() { return Math.toIntExact(rangeSize.getBytes()); } - public CacheFile get(final String fileName, final long length, final Path cacheDir) throws Exception { + public CacheFile get(final CacheKey cacheKey, final long fileLength, final Path cacheDir) throws Exception { ensureLifecycleStarted(); - return cache.computeIfAbsent(toCacheKey(cacheDir, fileName), key -> { + return cache.computeIfAbsent(cacheKey, key -> { ensureLifecycleStarted(); // generate a random UUID for the name of the cache file on disk final String uuid = UUIDs.randomBase64UUID(); @@ -106,29 +106,21 @@ public CacheFile get(final String fileName, final long length, final Path cacheD final Path path = cacheDir.resolve(uuid); assert Files.notExists(path) : "cache file already exists " + path; - return new CacheFile(fileName, length, path, getRangeSize()); + return new CacheFile(key.toString(), fileLength, path, getRangeSize()); }); } /** - * Remove from cache all entries that match the given predicate. + * Invalidate cache entries with keys matching the given predicate * * @param predicate the predicate to evaluate */ - void removeFromCache(final Predicate predicate) { - for (String cacheKey : cache.keys()) { + void removeFromCache(final Predicate predicate) { + for (CacheKey cacheKey : cache.keys()) { if (predicate.test(cacheKey)) { cache.invalidate(cacheKey); } } cache.refresh(); } - - /** - * Computes the cache key associated to the given Lucene cached file - */ - private static String toCacheKey(final Path cacheDir, String fileName) { - // TODO Fix this. Cache Key should be computed from snapshot id/index id/shard - return cacheDir.resolve(fileName).toAbsolutePath().toString(); - } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java index e570b6f7f3b14..7dea944d754ef 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java @@ -13,10 +13,14 @@ import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.nio.file.Path; import java.util.Objects; import java.util.concurrent.atomic.LongAdder; @@ -28,6 +32,10 @@ public void testRandomReads() throws IOException { try (CacheService cacheService = createCacheService()) { cacheService.start(); + SnapshotId snapshotId = new SnapshotId("_name", "_uuid"); + IndexId indexId = new IndexId("_name", "_uuid"); + ShardId shardId = new ShardId("_name", "_uuid", 0); + for (int i = 0; i < 5; i++) { final String fileName = randomAlphaOfLength(10); final byte[] input = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8); @@ -37,7 +45,8 @@ public void testRandomReads() throws IOException { directory = new CountingDirectory(directory, cacheService.getRangeSize()); } - try (CacheDirectory cacheDirectory = new CacheDirectory(directory, cacheService, createTempDir())) { + final Path cacheDir = createTempDir(); + try (CacheDirectory cacheDirectory = new CacheDirectory(directory, cacheService, cacheDir, snapshotId, indexId, shardId)) { try (IndexInput indexInput = cacheDirectory.openInput(fileName, newIOContext(random()))) { assertEquals(input.length, indexInput.length()); assertEquals(0, indexInput.getFilePointer()); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheKeyTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheKeyTests.java new file mode 100644 index 0000000000000..6b1a1f8daf5ce --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheKeyTests.java @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots.cache; + +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.EqualsHashCodeTestUtils; + +import static org.hamcrest.Matchers.equalTo; + +public class CacheKeyTests extends ESTestCase { + + public void testEqualsAndHashCode() { + EqualsHashCodeTestUtils.checkEqualsAndHashCode(createInstance(), this::copy, this::mutate); + } + + public void testBelongsTo() { + final CacheKey cacheKey = createInstance(); + + SnapshotId snapshotId = cacheKey.getSnapshotId(); + IndexId indexId = cacheKey.getIndexId(); + ShardId shardId = cacheKey.getShardId(); + + final boolean belongsTo; + switch (randomInt(2)) { + case 0: + snapshotId = randomValueOtherThan(cacheKey.getSnapshotId(), this::randomSnapshotId); + belongsTo = false; + break; + case 1: + indexId = randomValueOtherThan(cacheKey.getIndexId(), this::randomIndexId); + belongsTo = false; + break; + case 2: + shardId = randomValueOtherThan(cacheKey.getShardId(), this::randomShardId); + belongsTo = false; + break; + case 3: + belongsTo = true; + break; + default: + throw new AssertionError("Unsupported value"); + } + + assertThat(cacheKey.belongsTo(snapshotId, indexId, shardId), equalTo(belongsTo)); + } + + private SnapshotId randomSnapshotId() { + return new SnapshotId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10)); + } + + private IndexId randomIndexId() { + return new IndexId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10)); + } + + private ShardId randomShardId() { + return new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10), randomInt(5)); + } + + private CacheKey createInstance() { + return new CacheKey(randomSnapshotId(), randomIndexId(), randomShardId(), randomAlphaOfLengthBetween(5, 10)); + } + + private CacheKey copy(final CacheKey origin) { + SnapshotId snapshotId = origin.getSnapshotId(); + if (randomBoolean()) { + snapshotId = new SnapshotId(origin.getSnapshotId().getName(), origin.getSnapshotId().getUUID()); + } + IndexId indexId = origin.getIndexId(); + if (randomBoolean()) { + indexId = new IndexId(origin.getIndexId().getName(), origin.getIndexId().getId()); + } + ShardId shardId = origin.getShardId(); + if (randomBoolean()) { + shardId = new ShardId(new Index(shardId.getIndex().getName(), shardId.getIndex().getUUID()), shardId.id()); + } + return new CacheKey(snapshotId, indexId, shardId, origin.getFileName()); + } + + private CacheKey mutate(CacheKey origin) { + SnapshotId snapshotId = origin.getSnapshotId(); + IndexId indexId = origin.getIndexId(); + ShardId shardId = origin.getShardId(); + String fileName = origin.getFileName(); + + switch (randomInt(3)) { + case 0: + snapshotId = randomValueOtherThan(origin.getSnapshotId(), this::randomSnapshotId); + break; + case 1: + indexId = randomValueOtherThan(origin.getIndexId(), this::randomIndexId); + break; + case 2: + shardId = randomValueOtherThan(origin.getShardId(), this::randomShardId); + break; + case 3: + fileName = randomValueOtherThan(origin.getFileName(), () -> randomAlphaOfLengthBetween(5, 10)); + break; + default: + throw new AssertionError("Unsupported mutation"); + } + return new CacheKey(snapshotId, indexId, shardId, fileName); + } +}