Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.DirectoryStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.FileVisitResult;
Expand Down Expand Up @@ -142,22 +144,28 @@ public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOExce
IOUtils.rm(blobNames.stream().map(path::resolve).toArray(Path[]::new));
}

private InputStream bufferedInputStream(InputStream inputStream) {
return new BufferedInputStream(inputStream, blobStore.bufferSizeInBytes());
}

@Override
public InputStream readBlob(String name) throws IOException {
final Path resolvedPath = path.resolve(name);
try {
return new BufferedInputStream(Files.newInputStream(resolvedPath), blobStore.bufferSizeInBytes());
return bufferedInputStream(Files.newInputStream(resolvedPath));
} catch (FileNotFoundException fnfe) {
throw new NoSuchFileException("[" + name + "] blob not found");
}
}

@Override
public InputStream readBlob(String blobName, long position, long length) throws IOException {
final InputStream inputStream = readBlob(blobName);
long skipped = inputStream.skip(position); // NORELEASE
assert skipped == position;
return org.elasticsearch.common.io.Streams.limitStream(inputStream, length);
final SeekableByteChannel channel = Files.newByteChannel(path.resolve(blobName));
if (position > 0L) {
channel.position(position);
}
assert channel.position() == position;
return bufferedInputStream(org.elasticsearch.common.io.Streams.limitStream(Channels.newInputStream(channel), length));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,78 @@
*/
package org.elasticsearch.common.blobstore.fs;

import org.apache.lucene.mockfile.FilterFileSystemProvider;
import org.apache.lucene.mockfile.FilterSeekableByteChannel;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.io.PathUtilsForTesting;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import org.junit.Before;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.spi.FileSystemProvider;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;

@LuceneTestCase.SuppressFileSystems("*") // we do our own mocking
public class FsBlobContainerTests extends ESTestCase {

final AtomicLong totalBytesRead = new AtomicLong(0);
FileSystem fileSystem = null;

@Before
public void setupMockFileSystems() {
FileSystemProvider fileSystemProvider = new MockFileSystemProvider(PathUtils.getDefaultFileSystem(), totalBytesRead::addAndGet);
fileSystem = fileSystemProvider.getFileSystem(null);
PathUtilsForTesting.installMock(fileSystem); // restored by restoreFileSystem in ESTestCase
}

@After
public void closeMockFileSystems() throws IOException {
IOUtils.close(fileSystem);
}

public void testReadBlobRangeCorrectlySkipBytes() throws IOException {
final String blobName = randomAlphaOfLengthBetween(1, 20).toLowerCase(Locale.ROOT);
final byte[] blobData = randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb

final Path path = PathUtils.get(createTempDir().toString());
Files.write(path.resolve(blobName), blobData);

final FsBlobContainer container = new FsBlobContainer(new FsBlobStore(Settings.EMPTY, path, false), BlobPath.cleanPath(), path);
assertThat(totalBytesRead.get(), equalTo(0L));

final long start = randomLongBetween(0L, Math.max(0L, blobData.length - 1));
final long length = randomLongBetween(1L, blobData.length - start);

try (InputStream stream = container.readBlob(blobName, start, length)) {
assertThat(totalBytesRead.get(), equalTo(0L));
assertThat(Streams.consumeFully(stream), equalTo(length));
assertThat(totalBytesRead.get(), equalTo(length));
}
}

public void testTempBlobName() {
final String blobName = randomAlphaOfLengthBetween(1, 20);
final String tempBlobName = FsBlobContainer.tempBlobName(blobName);
Expand All @@ -37,4 +101,48 @@ public void testIsTempBlobName() {
final String tempBlobName = FsBlobContainer.tempBlobName(randomAlphaOfLengthBetween(1, 20));
assertThat(FsBlobContainer.isTempBlobName(tempBlobName), is(true));
}

static class MockFileSystemProvider extends FilterFileSystemProvider {

final Consumer<Long> onRead;

MockFileSystemProvider(FileSystem inner, Consumer<Long> onRead) {
super("mockfs://", inner);
this.onRead = onRead;
}

private int onRead(int read) {
if (read != -1) {
onRead.accept((long) read);
}
return read;
}

@Override
public SeekableByteChannel newByteChannel(Path path, Set<? extends OpenOption> opts, FileAttribute<?>... attrs) throws IOException {
return new FilterSeekableByteChannel(super.newByteChannel(path, opts, attrs)) {
@Override
public int read(ByteBuffer dst) throws IOException {
return onRead(super.read(dst));
}
};
}

@Override
public InputStream newInputStream(Path path, OpenOption... opts) throws IOException {
// no super.newInputStream(path, opts) as it will use the delegating FileSystem to open a SeekableByteChannel
// and instead we want the mocked newByteChannel() method to be used
return new FilterInputStream(delegate.newInputStream(path, opts)) {
@Override
public int read() throws IOException {
return onRead(super.read());
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return onRead(super.read(b, off, len));
}
};
}
}
}