diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index 12889ea7e33b0..42965d49003de 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -34,6 +34,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.SeekableByteChannel; import java.nio.file.DirectoryStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.FileVisitResult; @@ -142,11 +144,15 @@ public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOExce IOUtils.rm(blobNames.stream().map(path::resolve).toArray(Path[]::new)); } + private InputStream bufferedInputStream(InputStream inputStream) { + return new BufferedInputStream(inputStream, blobStore.bufferSizeInBytes()); + } + @Override public InputStream readBlob(String name) throws IOException { final Path resolvedPath = path.resolve(name); try { - return new BufferedInputStream(Files.newInputStream(resolvedPath), blobStore.bufferSizeInBytes()); + return bufferedInputStream(Files.newInputStream(resolvedPath)); } catch (FileNotFoundException fnfe) { throw new NoSuchFileException("[" + name + "] blob not found"); } @@ -154,10 +160,12 @@ public InputStream readBlob(String name) throws IOException { @Override public InputStream readBlob(String blobName, long position, long length) throws IOException { - final InputStream inputStream = readBlob(blobName); - long skipped = inputStream.skip(position); // NORELEASE - assert skipped == position; - return org.elasticsearch.common.io.Streams.limitStream(inputStream, length); + final SeekableByteChannel channel = Files.newByteChannel(path.resolve(blobName)); + if (position > 0L) { + channel.position(position); + } + assert channel.position() == position; + return bufferedInputStream(org.elasticsearch.common.io.Streams.limitStream(Channels.newInputStream(channel), length)); } @Override diff --git a/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java b/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java index c603eda906cae..8e9bc382cb842 100644 --- a/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java +++ b/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java @@ -18,14 +18,78 @@ */ package org.elasticsearch.common.blobstore.fs; +import org.apache.lucene.mockfile.FilterFileSystemProvider; +import org.apache.lucene.mockfile.FilterSeekableByteChannel; +import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.io.PathUtilsForTesting; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.test.ESTestCase; +import org.junit.After; +import org.junit.Before; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.FileSystem; +import java.nio.file.Files; +import java.nio.file.OpenOption; +import java.nio.file.Path; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.spi.FileSystemProvider; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; +@LuceneTestCase.SuppressFileSystems("*") // we do our own mocking public class FsBlobContainerTests extends ESTestCase { + final AtomicLong totalBytesRead = new AtomicLong(0); + FileSystem fileSystem = null; + + @Before + public void setupMockFileSystems() { + FileSystemProvider fileSystemProvider = new MockFileSystemProvider(PathUtils.getDefaultFileSystem(), totalBytesRead::addAndGet); + fileSystem = fileSystemProvider.getFileSystem(null); + PathUtilsForTesting.installMock(fileSystem); // restored by restoreFileSystem in ESTestCase + } + + @After + public void closeMockFileSystems() throws IOException { + IOUtils.close(fileSystem); + } + + public void testReadBlobRangeCorrectlySkipBytes() throws IOException { + final String blobName = randomAlphaOfLengthBetween(1, 20).toLowerCase(Locale.ROOT); + final byte[] blobData = randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb + + final Path path = PathUtils.get(createTempDir().toString()); + Files.write(path.resolve(blobName), blobData); + + final FsBlobContainer container = new FsBlobContainer(new FsBlobStore(Settings.EMPTY, path, false), BlobPath.cleanPath(), path); + assertThat(totalBytesRead.get(), equalTo(0L)); + + final long start = randomLongBetween(0L, Math.max(0L, blobData.length - 1)); + final long length = randomLongBetween(1L, blobData.length - start); + + try (InputStream stream = container.readBlob(blobName, start, length)) { + assertThat(totalBytesRead.get(), equalTo(0L)); + assertThat(Streams.consumeFully(stream), equalTo(length)); + assertThat(totalBytesRead.get(), equalTo(length)); + } + } + public void testTempBlobName() { final String blobName = randomAlphaOfLengthBetween(1, 20); final String tempBlobName = FsBlobContainer.tempBlobName(blobName); @@ -37,4 +101,48 @@ public void testIsTempBlobName() { final String tempBlobName = FsBlobContainer.tempBlobName(randomAlphaOfLengthBetween(1, 20)); assertThat(FsBlobContainer.isTempBlobName(tempBlobName), is(true)); } + + static class MockFileSystemProvider extends FilterFileSystemProvider { + + final Consumer onRead; + + MockFileSystemProvider(FileSystem inner, Consumer onRead) { + super("mockfs://", inner); + this.onRead = onRead; + } + + private int onRead(int read) { + if (read != -1) { + onRead.accept((long) read); + } + return read; + } + + @Override + public SeekableByteChannel newByteChannel(Path path, Set opts, FileAttribute... attrs) throws IOException { + return new FilterSeekableByteChannel(super.newByteChannel(path, opts, attrs)) { + @Override + public int read(ByteBuffer dst) throws IOException { + return onRead(super.read(dst)); + } + }; + } + + @Override + public InputStream newInputStream(Path path, OpenOption... opts) throws IOException { + // no super.newInputStream(path, opts) as it will use the delegating FileSystem to open a SeekableByteChannel + // and instead we want the mocked newByteChannel() method to be used + return new FilterInputStream(delegate.newInputStream(path, opts)) { + @Override + public int read() throws IOException { + return onRead(super.read()); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return onRead(super.read(b, off, len)); + } + }; + } + } }