diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java index 24088dbc9481c..2246244000d7f 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java @@ -5,6 +5,9 @@ */ package org.elasticsearch.xpack.searchablesnapshots.cache; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.Directory; @@ -14,6 +17,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.Channels; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; @@ -25,6 +29,7 @@ import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -34,8 +39,10 @@ */ public class CacheDirectory extends FilterDirectory { + private static final Logger logger = LogManager.getLogger(CacheDirectory.class); private static final int COPY_BUFFER_SIZE = 8192; + private final Map stats; private final CacheService cacheService; private final SnapshotId snapshotId; private final IndexId indexId; @@ -45,6 +52,7 @@ public class CacheDirectory extends FilterDirectory { public CacheDirectory(Directory in, CacheService cacheService, Path cacheDir, SnapshotId snapshotId, IndexId indexId, ShardId shardId) throws IOException { super(in); + this.stats = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); this.cacheService = Objects.requireNonNull(cacheService); this.cacheDir = Files.createDirectories(cacheDir); this.snapshotId = Objects.requireNonNull(snapshotId); @@ -56,6 +64,11 @@ private CacheKey createCacheKey(String fileName) { return new CacheKey(snapshotId, indexId, shardId, fileName); } + // pkg private for tests + @Nullable IndexInputStats getStats(String name) { + return stats.get(name); + } + public void close() throws IOException { super.close(); // Ideally we could let the cache evict/remove cached files by itself after the @@ -66,7 +79,8 @@ public void close() throws IOException { @Override public IndexInput openInput(final String name, final IOContext context) throws IOException { ensureOpen(); - return new CacheBufferedIndexInput(name, fileLength(name), context); + final long fileLength = fileLength(name); + return new CacheBufferedIndexInput(name, fileLength, context, stats.computeIfAbsent(name, n -> new IndexInputStats(fileLength))); } private class CacheFileReference implements CacheFile.EvictionListener { @@ -141,22 +155,28 @@ public class CacheBufferedIndexInput extends BufferedIndexInput { private final long offset; private final long end; private final CacheFileReference cacheFileReference; + private final IndexInputStats stats; // the following are only mutable so they can be adjusted after cloning private AtomicBoolean closed; private boolean isClone; - CacheBufferedIndexInput(String fileName, long fileLength, IOContext ioContext) { - this(new CacheFileReference(fileName, fileLength), ioContext, + // last read position is kept around in order to detect (non)contiguous reads for stats + private long lastReadPosition; + + CacheBufferedIndexInput(String fileName, long fileLength, IOContext ioContext, IndexInputStats stats) { + this(new CacheFileReference(fileName, fileLength), ioContext, stats, "CachedBufferedIndexInput(" + fileName + ")", 0L, fileLength, false); + stats.incrementOpenCount(); } - private CacheBufferedIndexInput(CacheFileReference cacheFileReference, IOContext ioContext, String desc, long offset, long length, - boolean isClone) { + private CacheBufferedIndexInput(CacheFileReference cacheFileReference, IOContext ioContext, IndexInputStats stats, + String desc, long offset, long length, boolean isClone) { super(desc, ioContext); this.ioContext = ioContext; this.offset = offset; this.cacheFileReference = cacheFileReference; + this.stats = stats; this.end = offset + length; this.closed = new AtomicBoolean(false); this.isClone = isClone; @@ -171,6 +191,7 @@ public long length() { public void close() { if (closed.compareAndSet(false, true)) { if (isClone == false) { + stats.incrementCloseCount(); cacheFileReference.releaseOnClose(); } } @@ -180,12 +201,13 @@ public void close() { protected void readInternal(final byte[] buffer, final int offset, final int length) throws IOException { final long position = getFilePointer() + this.offset; - int bytesRead = 0; - while (bytesRead < length) { - final long pos = position + bytesRead; - final int off = offset + bytesRead; - final int len = length - bytesRead; + int totalBytesRead = 0; + while (totalBytesRead < length) { + final long pos = position + totalBytesRead; + final int off = offset + totalBytesRead; + final int len = length - totalBytesRead; + int bytesRead = 0; try { final CacheFile cacheFile = cacheFileReference.get(); if (cacheFile == null) { @@ -193,7 +215,7 @@ protected void readInternal(final byte[] buffer, final int offset, final int len } try (ReleasableLock ignored = cacheFile.fileLock()) { - bytesRead += cacheFile.fetchRange(pos, + bytesRead = cacheFile.fetchRange(pos, (start, end) -> readCacheFile(cacheFile.getChannel(), end, pos, buffer, off, len), (start, end) -> writeCacheFile(cacheFile.getChannel(), start, end)) .get(); @@ -202,7 +224,7 @@ protected void readInternal(final byte[] buffer, final int offset, final int len if (e instanceof AlreadyClosedException || (e.getCause() != null && e.getCause() instanceof AlreadyClosedException)) { try { // cache file was evicted during the range fetching, read bytes directly from source - bytesRead += readDirectly(pos, pos + len, buffer, off); + bytesRead = readDirectly(pos, pos + len, buffer, off); continue; } catch (Exception inner) { e.addSuppressed(inner); @@ -210,25 +232,34 @@ protected void readInternal(final byte[] buffer, final int offset, final int len } throw new IOException("Fail to read data from cache", e); + } finally { + totalBytesRead += bytesRead; } } - assert bytesRead == length : "partial read operation, read [" + bytesRead + "] bytes of [" + length + "]"; + assert totalBytesRead == length : "partial read operation, read [" + totalBytesRead + "] bytes of [" + length + "]"; + stats.incrementBytesRead(lastReadPosition, position, totalBytesRead); + lastReadPosition = position + totalBytesRead; } int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException { assert assertFileChannelOpen(fc); - return Channels.readFromFileChannel(fc, position, buffer, offset, Math.toIntExact(Math.min(length, end - position))); + int bytesRead = Channels.readFromFileChannel(fc, position, buffer, offset, Math.toIntExact(Math.min(length, end - position))); + stats.addCachedBytesRead(bytesRead); + return bytesRead; } @SuppressForbidden(reason = "Use positional writes on purpose") void writeCacheFile(FileChannel fc, long start, long end) throws IOException { assert assertFileChannelOpen(fc); final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, end - start))]; + logger.trace(() -> new ParameterizedMessage("writing range [{}-{}] to cache file [{}]", start, end, cacheFileReference)); + + int bytesCopied = 0; try (IndexInput input = in.openInput(cacheFileReference.getFileName(), ioContext)) { + stats.incrementInnerOpenCount(); if (start > 0) { input.seek(start); } - int bytesCopied = 0; long remaining = end - start; while (remaining > 0) { final int size = (remaining < copyBuffer.length) ? Math.toIntExact(remaining) : copyBuffer.length; @@ -237,6 +268,7 @@ void writeCacheFile(FileChannel fc, long start, long end) throws IOException { bytesCopied += size; remaining -= size; } + stats.addCachedBytesWritten(bytesCopied); } } @@ -247,6 +279,7 @@ protected void seekInternal(long pos) throws IOException { } else if (pos < 0L) { throw new IOException("Seeking to negative position [" + pos + "] for " + toString()); } + stats.incrementSeeks(getFilePointer(), pos); } @Override @@ -263,7 +296,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) { throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset + ",length=" + length + ",fileLength=" + this.length() + ": " + this); } - return new CacheBufferedIndexInput(cacheFileReference, ioContext, + return new CacheBufferedIndexInput(cacheFileReference, ioContext, stats, getFullSliceDescription(sliceDescription), this.offset + offset, length, true); } @@ -280,9 +313,12 @@ public String toString() { private int readDirectly(long start, long end, byte[] buffer, int offset) throws IOException { final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, end - start))]; + logger.trace(() -> + new ParameterizedMessage("direct reading of range [{}-{}] for cache file [{}]", start, end, cacheFileReference)); int bytesCopied = 0; try (IndexInput input = in.openInput(cacheFileReference.getFileName(), ioContext)) { + stats.incrementInnerOpenCount(); if (start > 0) { input.seek(start); } @@ -294,6 +330,7 @@ private int readDirectly(long start, long end, byte[] buffer, int offset) throws bytesCopied += len; remaining -= len; } + stats.addDirectBytesRead(bytesCopied); } return bytesCopied; } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/IndexInputStats.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/IndexInputStats.java new file mode 100644 index 0000000000000..f4a5d1d87cc89 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/IndexInputStats.java @@ -0,0 +1,194 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots.cache; + +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheDirectory.CacheBufferedIndexInput; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.LongConsumer; + +/** + * {@link IndexInputStats} records stats for a given {@link CacheBufferedIndexInput}. + */ +public class IndexInputStats { + + /* A threshold beyond which an index input seeking is counted as "large" */ + static final ByteSizeValue SEEKING_THRESHOLD = new ByteSizeValue(8, ByteSizeUnit.MB); + + private final long fileLength; + + private final LongAdder opened = new LongAdder(); + private final LongAdder inner = new LongAdder(); + private final LongAdder closed = new LongAdder(); + + private final Counter forwardSmallSeeks = new Counter(); + private final Counter backwardSmallSeeks = new Counter(); + + private final Counter forwardLargeSeeks = new Counter(); + private final Counter backwardLargeSeeks = new Counter(); + + private final Counter contiguousReads = new Counter(); + private final Counter nonContiguousReads = new Counter(); + + private final Counter directBytesRead = new Counter(); + + private final Counter cachedBytesRead = new Counter(); + private final Counter cachedBytesWritten = new Counter(); + + public IndexInputStats(long fileLength) { + this.fileLength = fileLength; + } + + public void incrementOpenCount() { + opened.increment(); + } + + public void incrementInnerOpenCount() { + inner.increment(); + } + + public void incrementCloseCount() { + closed.increment(); + } + + public void addCachedBytesRead(int bytesRead) { + cachedBytesRead.add(bytesRead); + } + + public void addCachedBytesWritten(int bytesWritten) { + cachedBytesWritten.add(bytesWritten); + } + + public void addDirectBytesRead(int bytesRead) { + directBytesRead.add(bytesRead); + } + + public void incrementBytesRead(long previousPosition, long currentPosition, int bytesRead) { + LongConsumer incBytesRead = (previousPosition == currentPosition) ? contiguousReads::add : nonContiguousReads::add; + incBytesRead.accept(bytesRead); + } + + public void incrementSeeks(long currentPosition, long newPosition) { + final long delta = newPosition - currentPosition; + if (delta == 0L) { + return; + } + final boolean isLarge = isLargeSeek(delta); + if (delta > 0) { + if (isLarge) { + forwardLargeSeeks.add(delta); + } else { + forwardSmallSeeks.add(delta); + } + } else { + if (isLarge) { + backwardLargeSeeks.add(delta); + } else { + backwardSmallSeeks.add(delta); + } + } + } + + long getFileLength() { + return fileLength; + } + + LongAdder getOpened() { + return opened; + } + + LongAdder getInnerOpened() { + return inner; + } + + LongAdder getClosed() { + return closed; + } + + Counter getForwardSmallSeeks() { + return forwardSmallSeeks; + } + + Counter getBackwardSmallSeeks() { + return backwardSmallSeeks; + } + + Counter getForwardLargeSeeks() { + return forwardLargeSeeks; + } + + Counter getBackwardLargeSeeks() { + return backwardLargeSeeks; + } + + Counter getContiguousReads() { + return contiguousReads; + } + + Counter getNonContiguousReads() { + return nonContiguousReads; + } + + Counter getDirectBytesRead() { + return directBytesRead; + } + + Counter getCachedBytesRead() { + return cachedBytesRead; + } + + Counter getCachedBytesWritten() { + return cachedBytesWritten; + } + + @SuppressForbidden(reason = "Handles Long.MIN_VALUE before using Math.abs()") + boolean isLargeSeek(long delta) { + return delta != Long.MIN_VALUE && Math.abs(delta) > SEEKING_THRESHOLD.getBytes(); + } + + static class Counter { + + private final LongAdder count = new LongAdder(); + private final LongAdder total = new LongAdder(); + private final AtomicLong min = new AtomicLong(Long.MAX_VALUE); + private final AtomicLong max = new AtomicLong(Long.MIN_VALUE); + + void add(final long value) { + count.increment(); + total.add(value); + min.updateAndGet(prev -> Math.min(prev, value)); + max.updateAndGet(prev -> Math.max(prev, value)); + } + + long count() { + return count.sum(); + } + + long total() { + return total.sum(); + } + + long min() { + final long value = min.get(); + if (value == Long.MAX_VALUE) { + return 0L; + } + return value; + } + + long max() { + final long value = max.get(); + if (value == Long.MIN_VALUE) { + return 0L; + } + return value; + } + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputStatsTests.java new file mode 100644 index 0000000000000..4cb5f8a58247c --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputStatsTests.java @@ -0,0 +1,306 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots.cache; + +import org.apache.lucene.store.BufferedIndexInput; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.elasticsearch.common.TriConsumer; +import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.assertCounter; +import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.createCacheService; +import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.numberOfRanges; +import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.randomCacheRangeSize; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class CacheBufferedIndexInputStatsTests extends ESIndexInputTestCase { + + private static final int MAX_FILE_LENGTH = 10_000; + + public void testOpenCount() throws Exception { + executeTestCase(createCacheService(random()), + (fileName, fileContent, cacheDirectory) -> { + try { + for (long i = 0L; i < randomLongBetween(1L, 20L); i++) { + IndexInputStats inputStats = cacheDirectory.getStats(fileName); + assertThat(inputStats, (i == 0L) ? nullValue() : notNullValue()); + + final IndexInput input = cacheDirectory.openInput(fileName, newIOContext(random())); + inputStats = cacheDirectory.getStats(fileName); + assertThat(inputStats.getOpened().longValue(), equalTo(i + 1L)); + input.close(); + } + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } + + public void testInnerOpenCount() throws Exception { + final ByteSizeValue rangeSize = randomCacheRangeSize(random()); + final CacheService noEvictionCacheService = new CacheService(new ByteSizeValue(1, ByteSizeUnit.GB), rangeSize); + + executeTestCase(noEvictionCacheService, + (fileName, fileContent, cacheDirectory) -> { + try { + assertThat( cacheDirectory.getStats(fileName), nullValue()); + + final IndexInput input = cacheDirectory.openInput(fileName, newIOContext(random())); + for (int j = 0; j < input.length(); j++) { + input.readByte(); + } + input.close(); + + final IndexInputStats inputStats = cacheDirectory.getStats(fileName); + assertThat("Inner IndexInput should have been opened for each cached range to write", + inputStats.getInnerOpened().longValue(), equalTo(numberOfRanges(input.length(), rangeSize.getBytes()))); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } + + public void testCloseCount() throws Exception { + executeTestCase(createCacheService(random()), + (fileName, fileContent, cacheDirectory) -> { + try { + for (long i = 0L; i < randomLongBetween(1L, 20L); i++) { + final IndexInput input = cacheDirectory.openInput(fileName, newIOContext(random())); + + IndexInputStats inputStats = cacheDirectory.getStats(fileName); + assertThat(inputStats, notNullValue()); + + assertThat(inputStats.getClosed().longValue(), equalTo(i)); + input.close(); + assertThat(inputStats.getClosed().longValue(), equalTo(i + 1L)); + } + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } + + public void testCachedBytesReadsAndWrites() throws Exception { + // a cache service with a low range size but enough space to not evict the cache file + final ByteSizeValue rangeSize = new ByteSizeValue(randomIntBetween(512, MAX_FILE_LENGTH), ByteSizeUnit.BYTES); + final CacheService cacheService = new CacheService(new ByteSizeValue(1, ByteSizeUnit.GB), rangeSize); + + executeTestCase(cacheService, (fileName, fileContent, cacheDirectory) -> { + try (IndexInput input = cacheDirectory.openInput(fileName, newIOContext(random()))) { + final long length = input.length(); + + IndexInputStats inputStats = cacheDirectory.getStats(fileName); + assertThat(inputStats, notNullValue()); + + randomReadAndSlice(input, Math.toIntExact(length)); + + assertThat(inputStats.getCachedBytesWritten(), notNullValue()); + assertThat(inputStats.getCachedBytesWritten().total(), equalTo(length)); + assertThat(inputStats.getCachedBytesWritten().count(), equalTo(numberOfRanges(length, rangeSize.getBytes()))); + assertThat(inputStats.getCachedBytesWritten().min(), greaterThan(0L)); + assertThat(inputStats.getCachedBytesWritten().max(), + (length < rangeSize.getBytes()) ? equalTo(length) : equalTo(rangeSize.getBytes())); + + assertThat(inputStats.getCachedBytesRead(), notNullValue()); + assertThat(inputStats.getCachedBytesRead().total(), greaterThanOrEqualTo(length)); + assertThat(inputStats.getCachedBytesRead().count(), greaterThan(0L)); + assertThat(inputStats.getCachedBytesRead().min(), greaterThan(0L)); + assertThat(inputStats.getCachedBytesRead().max(), + (length < rangeSize.getBytes()) ? lessThanOrEqualTo(length) : lessThanOrEqualTo(rangeSize.getBytes())); + + assertCounter(inputStats.getDirectBytesRead(), 0L, 0L, 0L, 0L); + + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } + + public void testDirectBytesReads() throws Exception { + final CacheService noDiskSpaceLeftCacheService + = new CacheService(new ByteSizeValue(0, ByteSizeUnit.BYTES), new ByteSizeValue(0, ByteSizeUnit.BYTES)); + + executeTestCase(noDiskSpaceLeftCacheService, (fileName, fileContent, cacheDirectory) -> { + assertThat(cacheDirectory.getStats(fileName), nullValue()); + final IOContext ioContext = newIOContext(random()); + + try (IndexInput input = cacheDirectory.openInput(fileName, ioContext)) { + final IndexInputStats inputStats = cacheDirectory.getStats(fileName); + + // account for internal buffered reads + final long bufferSize = (long) BufferedIndexInput.bufferSize(ioContext); + final long remaining = input.length() % bufferSize; + final long expectedTotal = input.length(); + final long expectedCount = input.length() / bufferSize + (remaining > 0L ? 1L : 0L); + final long minRead = remaining > 0L ? remaining : bufferSize; + final long maxRead = input.length() < bufferSize ? input.length() : bufferSize; + + // read all index input sequentially as it simplifies testing + final byte[] readBuffer = new byte[512]; + for (long i = 0L; i < input.length();) { + int size = between(1, Math.toIntExact(Math.min(readBuffer.length, input.length() - input.getFilePointer()))); + input.readBytes(readBuffer, 0, size); + i += size; + + // direct cache file reads are aligned with the internal buffer + long currentCount = i / bufferSize + (i % bufferSize > 0L ? 1L : 0L); + if (currentCount < expectedCount) { + assertCounter(inputStats.getDirectBytesRead(), currentCount * bufferSize, currentCount, bufferSize, bufferSize); + } else { + assertCounter(inputStats.getDirectBytesRead(), expectedTotal, expectedCount, minRead, maxRead); + } + } + + // cache file has never been written nor read + assertCounter(inputStats.getCachedBytesWritten(), 0L, 0L, 0L, 0L); + assertCounter(inputStats.getCachedBytesRead(), 0L, 0L, 0L, 0L); + + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } + + public void testReadBytesContiguously() throws Exception { + // use default cache service settings + final CacheService cacheService = new CacheService(Settings.EMPTY); + + executeTestCase(cacheService, (fileName, fileContent, cacheDirectory) -> { + final IOContext ioContext = newIOContext(random()); + + try (IndexInput input = cacheDirectory.openInput(fileName, ioContext)) { + final IndexInputStats inputStats = cacheDirectory.getStats(fileName); + + // account for the CacheBufferedIndexInput internal buffer + final long bufferSize = (long) BufferedIndexInput.bufferSize(ioContext); + final long remaining = input.length() % bufferSize; + final long expectedTotal = input.length(); + final long expectedCount = input.length() / bufferSize + (remaining > 0L ? 1L : 0L); + final long minRead = remaining > 0L ? remaining : bufferSize; + final long maxRead = input.length() < bufferSize ? input.length() : bufferSize; + + final byte[] readBuffer = new byte[512]; + + // read the input input sequentially + for (long bytesRead = 0L; bytesRead < input.length();) { + int size = between(1, Math.toIntExact(Math.min(readBuffer.length, input.length() - bytesRead))); + input.readBytes(readBuffer, 0, size); + bytesRead += size; + + // cache file reads are aligned with internal buffered reads + long currentCount = bytesRead / bufferSize + (bytesRead % bufferSize > 0L ? 1L : 0L); + if (currentCount < expectedCount) { + assertCounter(inputStats.getContiguousReads(), currentCount * bufferSize, currentCount, bufferSize, bufferSize); + assertCounter(inputStats.getCachedBytesRead(), currentCount * bufferSize, currentCount, bufferSize, bufferSize); + + } else { + assertCounter(inputStats.getContiguousReads(), expectedTotal, expectedCount, minRead, maxRead); + assertCounter(inputStats.getCachedBytesRead(), expectedTotal, expectedCount, minRead, maxRead); + } + } + + // cache file has been written in a single chunk + assertCounter(inputStats.getCachedBytesWritten(), input.length(), 1L, input.length(), input.length()); + + assertCounter(inputStats.getNonContiguousReads(), 0L, 0L, 0L, 0L); + assertCounter(inputStats.getDirectBytesRead(), 0L, 0L, 0L, 0L); + + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } + + public void testReadBytesNonContiguously() throws Exception { + // use default cache service settings + final CacheService cacheService = new CacheService(Settings.EMPTY); + + executeTestCase(cacheService, (fileName, fileContent, cacheDirectory) -> { + final IOContext ioContext = newIOContext(random()); + + try (IndexInput input = cacheDirectory.openInput(fileName, ioContext)) { + final IndexInputStats inputStats = cacheDirectory.getStats(fileName); + + long totalBytesRead = 0L; + long minBytesRead = Long.MAX_VALUE; + long maxBytesRead = Long.MIN_VALUE; + + for (long i = 1L; i <= randomLongBetween(1L, 10L); i++) { + final long randomPosition = randomLongBetween(1L, input.length() - 1L); + input.seek(randomPosition); + + final byte[] readBuffer = new byte[512]; + int size = between(1, Math.toIntExact(Math.min(readBuffer.length, input.length() - randomPosition))); + input.readBytes(readBuffer, 0, size); + + // BufferedIndexInput tries to read as much bytes as possible + final long bytesRead = Math.min(BufferedIndexInput.bufferSize(ioContext), input.length() - randomPosition); + totalBytesRead += bytesRead; + minBytesRead = (bytesRead < minBytesRead) ? bytesRead : minBytesRead; + maxBytesRead = (bytesRead > maxBytesRead) ? bytesRead : maxBytesRead; + + assertCounter(inputStats.getNonContiguousReads(), totalBytesRead, i, minBytesRead, maxBytesRead); + + // seek to the beginning forces a refill of the internal buffer (and simplifies a lot the test) + input.seek(0L); + } + + // cache file has been written in a single chunk + assertCounter(inputStats.getCachedBytesWritten(), input.length(), 1L, input.length(), input.length()); + + assertCounter(inputStats.getContiguousReads(), 0L, 0L, 0L, 0L); + assertCounter(inputStats.getDirectBytesRead(), 0L, 0L, 0L, 0L); + + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } + + private static void executeTestCase(CacheService cacheService, TriConsumer test) throws Exception { + final byte[] fileContent = randomUnicodeOfLength(randomIntBetween(10, MAX_FILE_LENGTH)).getBytes(StandardCharsets.UTF_8); + executeTestCase(cacheService, randomAlphaOfLength(10), fileContent, test); + } + + private static void executeTestCase(CacheService cacheService, String fileName, byte[] fileContent, + TriConsumer test) throws Exception { + + final SnapshotId snapshotId = new SnapshotId("_name", "_uuid"); + final IndexId indexId = new IndexId("_name", "_uuid"); + final ShardId shardId = new ShardId("_name", "_uuid", 0); + + try (CacheService ignored = cacheService; + Directory directory = newDirectory(); + CacheDirectory cacheDirectory = new CacheDirectory(directory, cacheService, createTempDir(), snapshotId, indexId, shardId) + ) { + cacheService.start(); + assertThat(cacheDirectory.getStats(fileName), nullValue()); + + final IndexOutput indexOutput = directory.createOutput(fileName, newIOContext(random())); + indexOutput.writeBytes(fileContent, fileContent.length); + indexOutput.close(); + + test.apply(fileName, fileContent, cacheDirectory); + } + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java index 7dea944d754ef..444931ab2394f 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java @@ -11,8 +11,6 @@ import org.apache.lucene.store.IndexInput; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.SnapshotId; @@ -24,12 +22,14 @@ import java.util.Objects; import java.util.concurrent.atomic.LongAdder; +import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.createCacheService; +import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.numberOfRanges; import static org.hamcrest.Matchers.equalTo; public class CacheBufferedIndexInputTests extends ESIndexInputTestCase { public void testRandomReads() throws IOException { - try (CacheService cacheService = createCacheService()) { + try (CacheService cacheService = createCacheService(random())) { cacheService.start(); SnapshotId snapshotId = new SnapshotId("_name", "_uuid"); @@ -68,25 +68,6 @@ public void testRandomReads() throws IOException { } } - private CacheService createCacheService() { - final ByteSizeValue cacheSize = new ByteSizeValue(randomIntBetween(1, 100), - randomFrom(ByteSizeUnit.BYTES, ByteSizeUnit.KB, ByteSizeUnit.MB, ByteSizeUnit.GB)); - final ByteSizeValue rangeSize = new ByteSizeValue(randomIntBetween(1, 100), - randomFrom(ByteSizeUnit.BYTES, ByteSizeUnit.KB, ByteSizeUnit.MB)); - return new CacheService(cacheSize, rangeSize); - } - - private static long numberOfRanges(int fileSize, int rangeSize) { - long numberOfRanges = fileSize / rangeSize; - if (fileSize % rangeSize > 0) { - numberOfRanges++; - } - if (numberOfRanges == 0) { - numberOfRanges++; - } - return numberOfRanges; - } - /** * FilterDirectory that provides a single IndexInput with a given name and content. */ diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/IndexInputStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/IndexInputStatsTests.java new file mode 100644 index 0000000000000..040c2808af3ba --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/IndexInputStatsTests.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots.cache; + +import org.elasticsearch.test.ESTestCase; + +import static org.elasticsearch.xpack.searchablesnapshots.cache.IndexInputStats.SEEKING_THRESHOLD; +import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.assertCounter; + +public class IndexInputStatsTests extends ESTestCase { + + public void testReads() { + final long fileLength = randomLongBetween(1L, 1_000L); + final IndexInputStats inputStats = new IndexInputStats(fileLength); + + assertCounter(inputStats.getContiguousReads(), 0L, 0L, 0L, 0L); + assertCounter(inputStats.getNonContiguousReads(), 0L, 0L, 0L, 0L); + + final IndexInputStats.Counter contiguous = new IndexInputStats.Counter(); + final IndexInputStats.Counter nonContiguous = new IndexInputStats.Counter(); + + for (int i = 0; i < randomIntBetween(1, 50); i++) { + final long currentPosition = randomLongBetween(0L, inputStats.getFileLength() - 1L); + final long previousPosition = randomBoolean() ? currentPosition : randomLongBetween(0L, inputStats.getFileLength() - 1L); + final int bytesRead = randomIntBetween(1, Math.toIntExact(Math.max(1L, inputStats.getFileLength() - currentPosition))); + + inputStats.incrementBytesRead(previousPosition, currentPosition, bytesRead); + + if (previousPosition == currentPosition) { + contiguous.add(bytesRead); + } else { + nonContiguous.add(bytesRead); + } + } + + assertCounter(inputStats.getContiguousReads(), + contiguous.total(), contiguous.count(), contiguous.min(), contiguous.max()); + assertCounter(inputStats.getNonContiguousReads(), + nonContiguous.total(), nonContiguous.count(), nonContiguous.min(), nonContiguous.max()); + } + + public void testSeeks() { + final long fileLength = randomLongBetween(1L, 1_000L); + final IndexInputStats inputStats = new IndexInputStats(fileLength); + + assertCounter(inputStats.getForwardSmallSeeks(), 0L, 0L, 0L, 0L); + assertCounter(inputStats.getForwardLargeSeeks(), 0L, 0L, 0L, 0L); + assertCounter(inputStats.getBackwardSmallSeeks(), 0L, 0L, 0L, 0L); + assertCounter(inputStats.getBackwardLargeSeeks(), 0L, 0L, 0L, 0L); + + final IndexInputStats.Counter fwSmallSeeks = new IndexInputStats.Counter(); + final IndexInputStats.Counter fwLargeSeeks = new IndexInputStats.Counter(); + final IndexInputStats.Counter bwSmallSeeks = new IndexInputStats.Counter(); + final IndexInputStats.Counter bwLargeSeeks = new IndexInputStats.Counter(); + + for (int i = 0; i < randomIntBetween(1, 50); i++) { + final long currentPosition = randomLongBetween(0L, fileLength); + final long seekToPosition = randomLongBetween(0L, fileLength); + inputStats.incrementSeeks(currentPosition, seekToPosition); + + final long delta = seekToPosition - currentPosition; + if (delta > 0) { + IndexInputStats.Counter forwardCounter = (delta <= SEEKING_THRESHOLD.getBytes()) ? fwSmallSeeks : fwLargeSeeks; + forwardCounter.add(delta); + } else if (delta < 0) { + IndexInputStats.Counter backwardCounter = (delta >= -1 * SEEKING_THRESHOLD.getBytes()) ? bwSmallSeeks : bwLargeSeeks; + backwardCounter.add(delta); + } + } + + assertCounter(inputStats.getForwardSmallSeeks(), + fwSmallSeeks.total(), fwSmallSeeks.count(), fwSmallSeeks.min(), fwSmallSeeks.max()); + assertCounter(inputStats.getForwardLargeSeeks(), + fwLargeSeeks.total(), fwLargeSeeks.count(), fwLargeSeeks.min(), fwLargeSeeks.max()); + + assertCounter(inputStats.getBackwardSmallSeeks(), + bwSmallSeeks.total(), bwSmallSeeks.count(), bwSmallSeeks.min(), bwSmallSeeks.max()); + assertCounter(inputStats.getBackwardLargeSeeks(), + bwLargeSeeks.total(), bwLargeSeeks.count(), bwLargeSeeks.min(), bwLargeSeeks.max()); + } + + public void testSeekToSamePosition() { + final IndexInputStats inputStats = new IndexInputStats(randomLongBetween(1L, 1_000L)); + final long position = randomLongBetween(0L, inputStats.getFileLength()); + + inputStats.incrementSeeks(position, position); + + assertCounter(inputStats.getForwardSmallSeeks(), 0L, 0L, 0L, 0L); + assertCounter(inputStats.getForwardLargeSeeks(), 0L, 0L, 0L, 0L); + assertCounter(inputStats.getBackwardSmallSeeks(), 0L, 0L, 0L, 0L); + assertCounter(inputStats.getBackwardLargeSeeks(), 0L, 0L, 0L, 0L); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/TestUtils.java new file mode 100644 index 0000000000000..f3023a0a8969d --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/TestUtils.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots.cache; + +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; + +import java.util.List; +import java.util.Random; + +import static com.carrotsearch.randomizedtesting.generators.RandomNumbers.randomIntBetween; +import static com.carrotsearch.randomizedtesting.generators.RandomPicks.randomFrom; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +final class TestUtils { + private TestUtils() { + } + + static CacheService createCacheService(final Random random) { + final ByteSizeValue cacheSize = new ByteSizeValue(randomIntBetween(random, 1, 100), + randomFrom(random, List.of(ByteSizeUnit.BYTES, ByteSizeUnit.KB, ByteSizeUnit.MB, ByteSizeUnit.GB))); + return new CacheService(cacheSize, randomCacheRangeSize(random)); + } + + static ByteSizeValue randomCacheRangeSize(final Random random) { + return new ByteSizeValue(randomIntBetween(random, 1, 100), + randomFrom(random, List.of(ByteSizeUnit.BYTES, ByteSizeUnit.KB, ByteSizeUnit.MB))); + } + + static long numberOfRanges(long fileSize, long rangeSize) { + return numberOfRanges(Math.toIntExact(fileSize), Math.toIntExact(rangeSize)); + } + + static long numberOfRanges(int fileSize, int rangeSize) { + long numberOfRanges = fileSize / rangeSize; + if (fileSize % rangeSize > 0) { + numberOfRanges++; + } + if (numberOfRanges == 0) { + numberOfRanges++; + } + return numberOfRanges; + } + + static void assertCounter(IndexInputStats.Counter counter, long total, long count, long min, long max) { + assertThat(counter.total(), equalTo(total)); + assertThat(counter.count(), equalTo(count)); + assertThat(counter.min(), equalTo(min)); + assertThat(counter.max(), equalTo(max)); + } +}