Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -894,7 +894,7 @@ public interface RangeAvailableHandler {

@FunctionalInterface
public interface RangeMissingHandler {
void fillCacheRange(SharedBytes.IO channel, long channelPos, long relativePos, long length, Consumer<Long> progressUpdater)
void fillCacheRange(SharedBytes.IO channel, long channelPos, long relativePos, long length, LongConsumer progressUpdater)
throws IOException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongConsumer;

public class CacheFile {

Expand Down Expand Up @@ -334,7 +334,7 @@ public interface RangeAvailableHandler {

@FunctionalInterface
public interface RangeMissingHandler {
void fillCacheRange(FileChannel channel, long from, long to, Consumer<Long> progressUpdater) throws IOException;
void fillCacheRange(FileChannel channel, long from, long to, LongConsumer progressUpdater) throws IOException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.concurrent.Semaphore;
import java.util.function.LongConsumer;

import static org.elasticsearch.blobcache.BlobCacheUtils.toIntBytes;
import static org.elasticsearch.core.Strings.format;
Expand Down Expand Up @@ -113,22 +112,12 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception {
final int length = b.remaining();
final int originalByteBufPosition = b.position();

final ReentrantReadWriteLock luceneByteBufLock = new ReentrantReadWriteLock();
final AtomicBoolean stopAsyncReads = new AtomicBoolean();
// Runnable that, when called, ensures that async callbacks (such as those used by readCacheFile) are not
// Semaphore that, when all permits are acquired, ensures that async callbacks (such as those used by readCacheFile) are not
// accessing the byte buffer anymore that was passed to readWithoutBlobCache
// In particular, it's important to call this method before adapting the ByteBuffer's offset
final Runnable preventAsyncBufferChanges = () -> {
luceneByteBufLock.writeLock().lock();
try {
stopAsyncReads.set(true);
} finally {
luceneByteBufLock.writeLock().unlock();
}
};

// In particular, it's important to acquire all permits before adapting the ByteBuffer's offset
final Semaphore luceneByteBufPermits = new Semaphore(Integer.MAX_VALUE);
boolean bufferWriteLocked = false;
logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this);

try {
final ByteRange startRangeToWrite = computeRange(position);
final ByteRange endRangeToWrite = computeRange(position + length - 1);
Expand All @@ -149,8 +138,7 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception {
len,
b,
rangeToRead.start(),
luceneByteBufLock,
stopAsyncReads
luceneByteBufPermits
),
(channel, channelPos, relativePos, len, progressUpdater) -> {
final long startTimeNanos = stats.currentTimeNanos();
Expand All @@ -163,12 +151,15 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception {
SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME
);
assert bytesRead == length : bytesRead + " vs " + length;
assert luceneByteBufLock.getReadHoldCount() == 0;
assert luceneByteBufPermits.availablePermits() == Integer.MAX_VALUE;

preventAsyncBufferChanges.run();
luceneByteBufPermits.acquire(Integer.MAX_VALUE);
bufferWriteLocked = true;
b.position(originalByteBufPosition + bytesRead); // mark all bytes as accounted for
} finally {
preventAsyncBufferChanges.run();
if (bufferWriteLocked == false) {
luceneByteBufPermits.acquire(Integer.MAX_VALUE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intend to release here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this is just a paranoid fail-safe like we had in the previous version of this, we never release. We always just acquire all of them to close the buffer for writes for good to have the same behaviour we previously had via the boolean but simpler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks. I wonder if we could have some sort of ByteBuffer wrapper that we could "invalidate" such at it then prevent any reading from it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(not blocking the PR to be merged)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's see later I'd say. This is definitely a pattern we don't just have here but also in at least one other spot. I'm still hoping maybe we can find a way to not have to do this (passing the buffer around to other threads in general) since somehow this is never actually safe :) I'll think on it!

}
}
}

Expand All @@ -188,8 +179,7 @@ private int readCacheFile(
long length,
final ByteBuffer buffer,
long logicalPos,
ReentrantReadWriteLock luceneByteBufLock,
AtomicBoolean stopAsyncReads
Semaphore luceneByteBufPermits
) throws IOException {
logger.trace(
"{}: reading cached {} logical {} channel {} pos {} length {} (details: {})",
Expand All @@ -205,21 +195,10 @@ private int readCacheFile(
return 0;
}
final int bytesRead;
if (luceneByteBufLock.readLock().tryLock()) {
if (luceneByteBufPermits.tryAcquire()) {
try {
boolean shouldStopReading = stopAsyncReads.get();
if (shouldStopReading) {
// return fake response
return Math.toIntExact(length);
}
// create slice that is positioned to read the given values
final ByteBuffer dup = buffer.duplicate();
final int newPosition = dup.position() + Math.toIntExact(relativePos);
assert newPosition <= dup.limit() : "newpos " + newPosition + " limit " + dup.limit();
assert newPosition + length <= buffer.limit()
: "oldpos " + dup.position() + " newpos " + newPosition + " length " + length + " limit " + buffer.limit();
dup.position(newPosition);
dup.limit(newPosition + Math.toIntExact(length));
final ByteBuffer dup = buffer.slice(buffer.position() + Math.toIntExact(relativePos), Math.toIntExact(length));
bytesRead = fc.read(dup, channelPos);
if (bytesRead == -1) {
throw new EOFException(
Expand All @@ -233,7 +212,7 @@ private int readCacheFile(
);
}
} finally {
luceneByteBufLock.readLock().unlock();
luceneByteBufPermits.release();
}
} else {
// return fake response
Expand Down Expand Up @@ -263,7 +242,7 @@ private void writeCacheFile(
final long fileChannelPos,
final long relativePos,
final long length,
final Consumer<Long> progressUpdater,
final LongConsumer progressUpdater,
final long startTimeNanos
) throws IOException {
assert ThreadPool.assertCurrentThreadPool(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongConsumer;

import static org.elasticsearch.blobcache.BlobCacheUtils.toIntBytes;
import static org.elasticsearch.core.Strings.format;
Expand Down Expand Up @@ -287,7 +287,7 @@ protected int readCacheFile(final FileChannel fc, final long position, final Byt
return bytesRead;
}

protected void writeCacheFile(final FileChannel fc, final long start, final long end, final Consumer<Long> progressUpdater)
protected void writeCacheFile(final FileChannel fc, final long start, final long end, final LongConsumer progressUpdater)
throws IOException {
assert assertFileChannelOpen(fc);
assert ThreadPool.assertCurrentThreadPool(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
Expand Down