diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index bb04657848087..2e84e6d85fdd6 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -52,7 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; -import java.util.function.Consumer; +import java.util.function.LongConsumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -894,7 +894,7 @@ public interface RangeAvailableHandler { @FunctionalInterface public interface RangeMissingHandler { - void fillCacheRange(SharedBytes.IO channel, long channelPos, long relativePos, long length, Consumer progressUpdater) + void fillCacheRange(SharedBytes.IO channel, long channelPos, long relativePos, long length, LongConsumer progressUpdater) throws IOException; } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/CacheFile.java index 8c0bc924b819d..9377164dd0952 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/CacheFile.java @@ -35,7 +35,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; +import java.util.function.LongConsumer; public class CacheFile { @@ -334,7 +334,7 @@ public interface RangeAvailableHandler { @FunctionalInterface public interface RangeMissingHandler { - void fillCacheRange(FileChannel channel, long from, long to, Consumer progressUpdater) throws IOException; + void fillCacheRange(FileChannel channel, long from, long to, LongConsumer progressUpdater) throws IOException; } /** diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInput.java index 7f924a9cae35a..735f4e18ad8ee 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInput.java @@ -26,9 +26,8 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Locale; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Consumer; +import java.util.concurrent.Semaphore; +import java.util.function.LongConsumer; import static org.elasticsearch.blobcache.BlobCacheUtils.toIntBytes; import static org.elasticsearch.core.Strings.format; @@ -113,22 +112,12 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception { final int length = b.remaining(); final int originalByteBufPosition = b.position(); - final ReentrantReadWriteLock luceneByteBufLock = new ReentrantReadWriteLock(); - final AtomicBoolean stopAsyncReads = new AtomicBoolean(); - // Runnable that, when called, ensures that async callbacks (such as those used by readCacheFile) are not + // Semaphore that, when all permits are acquired, ensures that async callbacks (such as those used by readCacheFile) are not // accessing the byte buffer anymore that was passed to readWithoutBlobCache - // In particular, it's important to call this method before adapting the ByteBuffer's offset - final Runnable preventAsyncBufferChanges = () -> { - luceneByteBufLock.writeLock().lock(); - try { - stopAsyncReads.set(true); - } finally { - luceneByteBufLock.writeLock().unlock(); - } - }; - + // In particular, it's important to acquire all permits before adapting the ByteBuffer's offset + final Semaphore luceneByteBufPermits = new Semaphore(Integer.MAX_VALUE); + boolean bufferWriteLocked = false; logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this); - try { final ByteRange startRangeToWrite = computeRange(position); final ByteRange endRangeToWrite = computeRange(position + length - 1); @@ -149,8 +138,7 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception { len, b, rangeToRead.start(), - luceneByteBufLock, - stopAsyncReads + luceneByteBufPermits ), (channel, channelPos, relativePos, len, progressUpdater) -> { final long startTimeNanos = stats.currentTimeNanos(); @@ -163,12 +151,15 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception { SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME ); assert bytesRead == length : bytesRead + " vs " + length; - assert luceneByteBufLock.getReadHoldCount() == 0; + assert luceneByteBufPermits.availablePermits() == Integer.MAX_VALUE; - preventAsyncBufferChanges.run(); + luceneByteBufPermits.acquire(Integer.MAX_VALUE); + bufferWriteLocked = true; b.position(originalByteBufPosition + bytesRead); // mark all bytes as accounted for } finally { - preventAsyncBufferChanges.run(); + if (bufferWriteLocked == false) { + luceneByteBufPermits.acquire(Integer.MAX_VALUE); + } } } @@ -188,8 +179,7 @@ private int readCacheFile( long length, final ByteBuffer buffer, long logicalPos, - ReentrantReadWriteLock luceneByteBufLock, - AtomicBoolean stopAsyncReads + Semaphore luceneByteBufPermits ) throws IOException { logger.trace( "{}: reading cached {} logical {} channel {} pos {} length {} (details: {})", @@ -205,21 +195,10 @@ private int readCacheFile( return 0; } final int bytesRead; - if (luceneByteBufLock.readLock().tryLock()) { + if (luceneByteBufPermits.tryAcquire()) { try { - boolean shouldStopReading = stopAsyncReads.get(); - if (shouldStopReading) { - // return fake response - return Math.toIntExact(length); - } // create slice that is positioned to read the given values - final ByteBuffer dup = buffer.duplicate(); - final int newPosition = dup.position() + Math.toIntExact(relativePos); - assert newPosition <= dup.limit() : "newpos " + newPosition + " limit " + dup.limit(); - assert newPosition + length <= buffer.limit() - : "oldpos " + dup.position() + " newpos " + newPosition + " length " + length + " limit " + buffer.limit(); - dup.position(newPosition); - dup.limit(newPosition + Math.toIntExact(length)); + final ByteBuffer dup = buffer.slice(buffer.position() + Math.toIntExact(relativePos), Math.toIntExact(length)); bytesRead = fc.read(dup, channelPos); if (bytesRead == -1) { throw new EOFException( @@ -233,7 +212,7 @@ private int readCacheFile( ); } } finally { - luceneByteBufLock.readLock().unlock(); + luceneByteBufPermits.release(); } } else { // return fake response @@ -263,7 +242,7 @@ private void writeCacheFile( final long fileChannelPos, final long relativePos, final long length, - final Consumer progressUpdater, + final LongConsumer progressUpdater, final long startTimeNanos ) throws IOException { assert ThreadPool.assertCurrentThreadPool(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/MetadataCachingIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/MetadataCachingIndexInput.java index a599da2c073c9..9191662a334a2 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/MetadataCachingIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/MetadataCachingIndexInput.java @@ -41,7 +41,7 @@ import java.util.Objects; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; +import java.util.function.LongConsumer; import static org.elasticsearch.blobcache.BlobCacheUtils.toIntBytes; import static org.elasticsearch.core.Strings.format; @@ -287,7 +287,7 @@ protected int readCacheFile(final FileChannel fc, final long position, final Byt return bytesRead; } - protected void writeCacheFile(final FileChannel fc, final long start, final long end, final Consumer progressUpdater) + protected void writeCacheFile(final FileChannel fc, final long start, final long end, final LongConsumer progressUpdater) throws IOException { assert assertFileChannelOpen(fc); assert ThreadPool.assertCurrentThreadPool(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);