Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private Directory makeDirectory(IndexSettings indexSettings, ShardPath shardPath
Directory directory = new SearchableSnapshotDirectory(snapshot, blobContainer);
if (SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings.getSettings())) {
final Path cacheDir = shardPath.getDataPath().resolve("snapshots").resolve(snapshotId.getUUID());
directory = new CacheDirectory(directory, cacheService, cacheDir);
directory = new CacheDirectory(directory, cacheService, cacheDir, snapshotId, indexId, shardPath.getShardId());
}
directory = new InMemoryNoOpCommitDirectory(directory);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -34,19 +37,30 @@ public class CacheDirectory extends FilterDirectory {
private static final int COPY_BUFFER_SIZE = 8192;

private final CacheService cacheService;
private final SnapshotId snapshotId;
private final IndexId indexId;
private final ShardId shardId;
private final Path cacheDir;

public CacheDirectory(Directory in, CacheService cacheService, Path cacheDir) throws IOException {
public CacheDirectory(Directory in, CacheService cacheService, Path cacheDir, SnapshotId snapshotId, IndexId indexId, ShardId shardId)
throws IOException {
super(in);
this.cacheService = Objects.requireNonNull(cacheService);
this.cacheDir = Files.createDirectories(cacheDir);
this.snapshotId = Objects.requireNonNull(snapshotId);
this.indexId = Objects.requireNonNull(indexId);
this.shardId = Objects.requireNonNull(shardId);
}

private CacheKey createCacheKey(String fileName) {
return new CacheKey(snapshotId, indexId, shardId, fileName);
}

public void close() throws IOException {
super.close();
// Ideally we could let the cache evict/remove cached files by itself after the
// directory has been closed.
cacheService.removeFromCache(key -> key.startsWith(cacheDir.toString()));
cacheService.removeFromCache(cacheKey -> cacheKey.belongsTo(snapshotId, indexId, shardId));
}

@Override
Expand All @@ -57,12 +71,12 @@ public IndexInput openInput(final String name, final IOContext context) throws I

private class CacheFileReference implements CacheFile.EvictionListener {

private final String fileName;
private final long fileLength;
private final CacheKey cacheKey;
private final AtomicReference<CacheFile> cacheFile = new AtomicReference<>(); // null if evicted or not yet acquired

private CacheFileReference(String fileName, long fileLength) {
this.fileName = fileName;
this.cacheKey = createCacheKey(fileName);
this.fileLength = fileLength;
}

Expand All @@ -73,7 +87,7 @@ CacheFile get() throws Exception {
return currentCacheFile;
}

final CacheFile newCacheFile = cacheService.get(fileName, fileLength, cacheDir);
final CacheFile newCacheFile = cacheService.get(cacheKey, fileLength, cacheDir);
synchronized (this) {
currentCacheFile = cacheFile.get();
if (currentCacheFile != null) {
Expand All @@ -89,7 +103,7 @@ CacheFile get() throws Exception {
}

String getFileName() {
return fileName;
return cacheKey.getFileName();
}

@Override
Expand All @@ -113,7 +127,7 @@ void releaseOnClose() {
@Override
public String toString() {
return "CacheFileReference{" +
"fileName='" + fileName + '\'' +
"cacheKey='" + cacheKey + '\'' +
", fileLength=" + fileLength +
", cacheDir=" + cacheDir +
", acquired=" + (cacheFile.get() != null) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected void closeInternal() {

private final SparseFileTracker tracker;
private final int rangeSize;
private final String name;
private final String description;
private final Path file;

private volatile Set<EvictionListener> listeners;
Expand All @@ -59,9 +59,9 @@ protected void closeInternal() {
@Nullable // if evicted, or there are no listeners
private volatile FileChannel channel;

CacheFile(String name, long length, Path file, int rangeSize) {
CacheFile(String description, long length, Path file, int rangeSize) {
this.tracker = new SparseFileTracker(file.toString(), length);
this.name = Objects.requireNonNull(name);
this.description = Objects.requireNonNull(description);
this.file = Objects.requireNonNull(file);
this.listeners = new HashSet<>();
this.rangeSize = rangeSize;
Expand Down Expand Up @@ -219,7 +219,7 @@ private boolean invariant() {
@Override
public String toString() {
return "CacheFile{" +
"name='" + name + '\'' +
"desc='" + description + '\'' +
", file=" + file +
", length=" + tracker.getLength() +
", channel=" + (channel != null ? "yes" : "no") +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots.cache;

import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;

import java.util.Objects;

public class CacheKey {

private final SnapshotId snapshotId;
private final IndexId indexId;
private final ShardId shardId;
private final String fileName;

CacheKey(SnapshotId snapshotId, IndexId indexId, ShardId shardId, String fileName) {
this.snapshotId = Objects.requireNonNull(snapshotId);
this.indexId = Objects.requireNonNull(indexId);
this.shardId = Objects.requireNonNull(shardId);
this.fileName = Objects.requireNonNull(fileName);
}

SnapshotId getSnapshotId() {
return snapshotId;
}

IndexId getIndexId() {
return indexId;
}

ShardId getShardId() {
return shardId;
}

String getFileName() {
return fileName;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final CacheKey cacheKey = (CacheKey) o;
return Objects.equals(snapshotId, cacheKey.snapshotId)
&& Objects.equals(indexId, cacheKey.indexId)
&& Objects.equals(shardId, cacheKey.shardId)
&& Objects.equals(fileName, cacheKey.fileName);
}

@Override
public int hashCode() {
return Objects.hash(snapshotId, indexId, shardId, fileName);
}

@Override
public String toString() {
return "[" +
"snapshotId=" + snapshotId +
", indexId=" + indexId +
", shardId=" + shardId +
", fileName='" + fileName + '\'' +
']';
}

boolean belongsTo(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
return Objects.equals(this.snapshotId, snapshotId)
&& Objects.equals(this.indexId, indexId)
&& Objects.equals(this.shardId, shardId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class CacheService extends AbstractLifecycleComponent {
new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES), // max
Setting.Property.NodeScope);

private final Cache<String, CacheFile> cache;
private final Cache<CacheKey, CacheFile> cache;
private final ByteSizeValue cacheSize;
private final ByteSizeValue rangeSize;

Expand All @@ -52,7 +52,7 @@ public CacheService(final Settings settings) {
CacheService(final ByteSizeValue cacheSize, final ByteSizeValue rangeSize) {
this.cacheSize = Objects.requireNonNull(cacheSize);
this.rangeSize = Objects.requireNonNull(rangeSize);
this.cache = CacheBuilder.<String, CacheFile>builder()
this.cache = CacheBuilder.<CacheKey, CacheFile>builder()
.setMaximumWeight(cacheSize.getBytes())
.weigher((key, entry) -> entry.getLength())
// NORELEASE This does not immediately free space on disk, as cache file are only deleted when all index inputs
Expand Down Expand Up @@ -96,39 +96,31 @@ int getRangeSize() {
return Math.toIntExact(rangeSize.getBytes());
}

public CacheFile get(final String fileName, final long length, final Path cacheDir) throws Exception {
public CacheFile get(final CacheKey cacheKey, final long fileLength, final Path cacheDir) throws Exception {
ensureLifecycleStarted();
return cache.computeIfAbsent(toCacheKey(cacheDir, fileName), key -> {
return cache.computeIfAbsent(cacheKey, key -> {
ensureLifecycleStarted();
// generate a random UUID for the name of the cache file on disk
final String uuid = UUIDs.randomBase64UUID();
// resolve the cache file on disk w/ the expected cached file
final Path path = cacheDir.resolve(uuid);
assert Files.notExists(path) : "cache file already exists " + path;

return new CacheFile(fileName, length, path, getRangeSize());
return new CacheFile(key.toString(), fileLength, path, getRangeSize());
});
}

/**
* Remove from cache all entries that match the given predicate.
* Invalidate cache entries with keys matching the given predicate
*
* @param predicate the predicate to evaluate
*/
void removeFromCache(final Predicate<String> predicate) {
for (String cacheKey : cache.keys()) {
void removeFromCache(final Predicate<CacheKey> predicate) {
for (CacheKey cacheKey : cache.keys()) {
if (predicate.test(cacheKey)) {
cache.invalidate(cacheKey);
}
}
cache.refresh();
}

/**
* Computes the cache key associated to the given Lucene cached file
*/
private static String toCacheKey(final Path cacheDir, String fileName) {
// TODO Fix this. Cache Key should be computed from snapshot id/index id/shard
return cacheDir.resolve(fileName).toAbsolutePath().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
import org.elasticsearch.common.lucene.store.ESIndexInputTestCase;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Objects;
import java.util.concurrent.atomic.LongAdder;

Expand All @@ -28,6 +32,10 @@ public void testRandomReads() throws IOException {
try (CacheService cacheService = createCacheService()) {
cacheService.start();

SnapshotId snapshotId = new SnapshotId("_name", "_uuid");
IndexId indexId = new IndexId("_name", "_uuid");
ShardId shardId = new ShardId("_name", "_uuid", 0);

for (int i = 0; i < 5; i++) {
final String fileName = randomAlphaOfLength(10);
final byte[] input = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8);
Expand All @@ -37,7 +45,8 @@ public void testRandomReads() throws IOException {
directory = new CountingDirectory(directory, cacheService.getRangeSize());
}

try (CacheDirectory cacheDirectory = new CacheDirectory(directory, cacheService, createTempDir())) {
final Path cacheDir = createTempDir();
try (CacheDirectory cacheDirectory = new CacheDirectory(directory, cacheService, cacheDir, snapshotId, indexId, shardId)) {
try (IndexInput indexInput = cacheDirectory.openInput(fileName, newIOContext(random()))) {
assertEquals(input.length, indexInput.length());
assertEquals(0, indexInput.getFilePointer());
Expand Down
Loading