Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
*/
package org.elasticsearch.xpack.searchablesnapshots.cache;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedIndexInput;
import org.apache.lucene.store.Directory;
Expand All @@ -14,6 +17,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
Expand All @@ -25,6 +29,7 @@
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -34,8 +39,10 @@
*/
public class CacheDirectory extends FilterDirectory {

private static final Logger logger = LogManager.getLogger(CacheDirectory.class);
private static final int COPY_BUFFER_SIZE = 8192;

private final Map<String, IndexInputStats> stats;
private final CacheService cacheService;
private final SnapshotId snapshotId;
private final IndexId indexId;
Expand All @@ -45,6 +52,7 @@ public class CacheDirectory extends FilterDirectory {
public CacheDirectory(Directory in, CacheService cacheService, Path cacheDir, SnapshotId snapshotId, IndexId indexId, ShardId shardId)
throws IOException {
super(in);
this.stats = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
this.cacheService = Objects.requireNonNull(cacheService);
this.cacheDir = Files.createDirectories(cacheDir);
this.snapshotId = Objects.requireNonNull(snapshotId);
Expand All @@ -56,6 +64,11 @@ private CacheKey createCacheKey(String fileName) {
return new CacheKey(snapshotId, indexId, shardId, fileName);
}

// pkg private for tests
@Nullable IndexInputStats getStats(String name) {
return stats.get(name);
}

public void close() throws IOException {
super.close();
// Ideally we could let the cache evict/remove cached files by itself after the
Expand All @@ -66,7 +79,8 @@ public void close() throws IOException {
@Override
public IndexInput openInput(final String name, final IOContext context) throws IOException {
ensureOpen();
return new CacheBufferedIndexInput(name, fileLength(name), context);
final long fileLength = fileLength(name);
return new CacheBufferedIndexInput(name, fileLength, context, stats.computeIfAbsent(name, n -> new IndexInputStats(fileLength)));
}

private class CacheFileReference implements CacheFile.EvictionListener {
Expand Down Expand Up @@ -141,22 +155,28 @@ public class CacheBufferedIndexInput extends BufferedIndexInput {
private final long offset;
private final long end;
private final CacheFileReference cacheFileReference;
private final IndexInputStats stats;

// the following are only mutable so they can be adjusted after cloning
private AtomicBoolean closed;
private boolean isClone;

CacheBufferedIndexInput(String fileName, long fileLength, IOContext ioContext) {
this(new CacheFileReference(fileName, fileLength), ioContext,
// last read position is kept around in order to detect (non)contiguous reads for stats
private long lastReadPosition;

CacheBufferedIndexInput(String fileName, long fileLength, IOContext ioContext, IndexInputStats stats) {
this(new CacheFileReference(fileName, fileLength), ioContext, stats,
"CachedBufferedIndexInput(" + fileName + ")", 0L, fileLength, false);
stats.incrementOpenCount();
}

private CacheBufferedIndexInput(CacheFileReference cacheFileReference, IOContext ioContext, String desc, long offset, long length,
boolean isClone) {
private CacheBufferedIndexInput(CacheFileReference cacheFileReference, IOContext ioContext, IndexInputStats stats,
String desc, long offset, long length, boolean isClone) {
super(desc, ioContext);
this.ioContext = ioContext;
this.offset = offset;
this.cacheFileReference = cacheFileReference;
this.stats = stats;
this.end = offset + length;
this.closed = new AtomicBoolean(false);
this.isClone = isClone;
Expand All @@ -171,6 +191,7 @@ public long length() {
public void close() {
if (closed.compareAndSet(false, true)) {
if (isClone == false) {
stats.incrementCloseCount();
cacheFileReference.releaseOnClose();
}
}
Expand All @@ -180,20 +201,21 @@ public void close() {
protected void readInternal(final byte[] buffer, final int offset, final int length) throws IOException {
final long position = getFilePointer() + this.offset;

int bytesRead = 0;
while (bytesRead < length) {
final long pos = position + bytesRead;
final int off = offset + bytesRead;
final int len = length - bytesRead;
int totalBytesRead = 0;
while (totalBytesRead < length) {
final long pos = position + totalBytesRead;
final int off = offset + totalBytesRead;
final int len = length - totalBytesRead;

int bytesRead = 0;
try {
final CacheFile cacheFile = cacheFileReference.get();
if (cacheFile == null) {
throw new AlreadyClosedException("Failed to acquire a non-evicted cache file");
}

try (ReleasableLock ignored = cacheFile.fileLock()) {
bytesRead += cacheFile.fetchRange(pos,
bytesRead = cacheFile.fetchRange(pos,
(start, end) -> readCacheFile(cacheFile.getChannel(), end, pos, buffer, off, len),
(start, end) -> writeCacheFile(cacheFile.getChannel(), start, end))
.get();
Expand All @@ -202,33 +224,43 @@ protected void readInternal(final byte[] buffer, final int offset, final int len
if (e instanceof AlreadyClosedException || (e.getCause() != null && e.getCause() instanceof AlreadyClosedException)) {
try {
// cache file was evicted during the range fetching, read bytes directly from source
bytesRead += readDirectly(pos, pos + len, buffer, off);
bytesRead = readDirectly(pos, pos + len, buffer, off);
continue;
} catch (Exception inner) {
e.addSuppressed(inner);
}
}
throw new IOException("Fail to read data from cache", e);

} finally {
totalBytesRead += bytesRead;
}
}
assert bytesRead == length : "partial read operation, read [" + bytesRead + "] bytes of [" + length + "]";
assert totalBytesRead == length : "partial read operation, read [" + totalBytesRead + "] bytes of [" + length + "]";
stats.incrementBytesRead(lastReadPosition, position, totalBytesRead);
lastReadPosition = position + totalBytesRead;
}

int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException {
assert assertFileChannelOpen(fc);
return Channels.readFromFileChannel(fc, position, buffer, offset, Math.toIntExact(Math.min(length, end - position)));
int bytesRead = Channels.readFromFileChannel(fc, position, buffer, offset, Math.toIntExact(Math.min(length, end - position)));
stats.addCachedBytesRead(bytesRead);
return bytesRead;
}

@SuppressForbidden(reason = "Use positional writes on purpose")
void writeCacheFile(FileChannel fc, long start, long end) throws IOException {
assert assertFileChannelOpen(fc);
final String fileName = cacheFileReference.getFileName();
final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, end - start))];
try (IndexInput input = in.openInput(cacheFileReference.getFileName(), ioContext)) {
logger.trace(() -> new ParameterizedMessage("writing range [{}-{}] of file [{}] to cache file", start, end, fileName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think this will not include the shard ID so we might struggle to interpret the logs when there are lots of shards to search. I think logging cacheFileReference itself gives us everything we need.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I pushed 9de8453


int bytesCopied = 0;
try (IndexInput input = in.openInput(fileName, ioContext)) {
stats.incrementInnerOpenCount();
if (start > 0) {
input.seek(start);
}
int bytesCopied = 0;
long remaining = end - start;
while (remaining > 0) {
final int size = (remaining < copyBuffer.length) ? Math.toIntExact(remaining) : copyBuffer.length;
Expand All @@ -237,6 +269,7 @@ void writeCacheFile(FileChannel fc, long start, long end) throws IOException {
bytesCopied += size;
remaining -= size;
}
stats.addCachedBytesWritten(bytesCopied);
}
}

Expand All @@ -247,6 +280,7 @@ protected void seekInternal(long pos) throws IOException {
} else if (pos < 0L) {
throw new IOException("Seeking to negative position [" + pos + "] for " + toString());
}
stats.incrementSeeks(getFilePointer(), pos);
}

@Override
Expand All @@ -263,7 +297,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) {
throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset
+ ",length=" + length + ",fileLength=" + this.length() + ": " + this);
}
return new CacheBufferedIndexInput(cacheFileReference, ioContext,
return new CacheBufferedIndexInput(cacheFileReference, ioContext, stats,
getFullSliceDescription(sliceDescription), this.offset + offset, length, true);
}

Expand All @@ -279,10 +313,13 @@ public String toString() {
}

private int readDirectly(long start, long end, byte[] buffer, int offset) throws IOException {
final String fileName = cacheFileReference.getFileName();
final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, end - start))];
logger.trace(() -> new ParameterizedMessage("direct reading of range [{}-{}] from file [{}]", start, end, fileName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, here too :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


int bytesCopied = 0;
try (IndexInput input = in.openInput(cacheFileReference.getFileName(), ioContext)) {
try (IndexInput input = in.openInput(fileName, ioContext)) {
stats.incrementInnerOpenCount();
if (start > 0) {
input.seek(start);
}
Expand All @@ -294,6 +331,7 @@ private int readDirectly(long start, long end, byte[] buffer, int offset) throws
bytesCopied += len;
remaining -= len;
}
stats.addDirectBytesRead(bytesCopied);
}
return bytesCopied;
}
Expand Down
Loading