Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugins/repository-gcs/qa/google-cloud-storage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ restResources {
}
}

testFixtures.useFixture(':test:fixtures:gcs-fixture')
testFixtures.useFixture(':test:fixtures:gcs-fixture', 'gcs-fixture')
testFixtures.useFixture(':test:fixtures:gcs-fixture', 'gcs-fixture-third-party')
boolean useFixture = false

String gcsServiceAccount = System.getenv("google_storage_service_account")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public InputStream readBlob(String blobName) throws IOException {
return blobStore.readBlob(buildKey(blobName));
}

@Override
public InputStream readBlob(final String blobName, final long position, final long length) throws IOException {
return blobStore.readBlob(buildKey(blobName), position, length);
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.core.internal.io.Streams;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
Expand Down Expand Up @@ -171,7 +172,29 @@ Map<String, BlobContainer> listChildren(BlobPath path) throws IOException {
* @return the InputStream used to read the blob's content
*/
InputStream readBlob(String blobName) throws IOException {
return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName));
return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName), 0, Long.MAX_VALUE);
}

/**
* Returns an {@link java.io.InputStream} for the given blob's position and length
*
* @param blobName name of the blob
* @param position starting position to read from
* @param length length of bytes to read
* @return the InputStream used to read the blob's content
*/
InputStream readBlob(String blobName, long position, long length) throws IOException {
if (position < 0L) {
throw new IllegalArgumentException("position must be non-negative");
}
if (length < 0) {
throw new IllegalArgumentException("length must be non-negative");
}
if (length == 0) {
return new ByteArrayInputStream(new byte[0]);
} else {
return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName), position, length);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -54,6 +54,9 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {

private final BlobId blobId;

private final long start;
private final long length;

private final int maxRetries;

private InputStream currentStream;
Expand All @@ -62,33 +65,81 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
private long currentOffset;
private boolean closed;

GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId) throws IOException {
GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId, long start, long length) throws IOException {
this.client = client;
this.blobId = blobId;
this.start = start;
this.length = length;
this.maxRetries = client.getOptions().getRetrySettings().getMaxAttempts() + 1;
currentStream = openStream();
}

private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;

private InputStream openStream() throws IOException {
try {
final ReadChannel readChannel = SocketAccess.doPrivilegedIOException(() -> client.reader(blobId));
if (currentOffset > 0L) {
readChannel.seek(currentOffset);
}
return Channels.newInputStream(new ReadableByteChannel() {
final long end = start + length < 0L ? Long.MAX_VALUE : start + length; // inclusive
final SeekableByteChannel adaptedChannel = new SeekableByteChannel() {

long position;

@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int read(ByteBuffer dst) throws IOException {
final long remainingBytesToRead = end - position;
assert remainingBytesToRead >= 0L;
// The SDK uses the maximum between chunk size and dst.remaining() to determine fetch size
// We can be smarter here and only fetch what's needed when we know the length
if (remainingBytesToRead < DEFAULT_CHUNK_SIZE) {
readChannel.setChunkSize(Math.toIntExact(remainingBytesToRead));
}
if (remainingBytesToRead < dst.remaining()) {
dst.limit(dst.position() + Math.toIntExact(remainingBytesToRead));
}
try {
return SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst));
int read = SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst));
if (read > 0) {
position += read;
}
return read;
} catch (StorageException e) {
if (e.getCode() == HTTP_NOT_FOUND) {
throw new NoSuchFileException("Blob [" + blobId.getName() + "] does not exist");
throw new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + e.getMessage());
}
throw e;
} finally {
readChannel.setChunkSize(0); // set to default again
}
}

@Override
public int write(ByteBuffer src) {
throw new UnsupportedOperationException();
}

@Override
public long position() {
return position;
}

@Override
public SeekableByteChannel position(long newPosition) throws IOException {
readChannel.seek(newPosition);
this.position = newPosition;
return this;
}

@Override
public long size() {
return length;
}

@Override
public SeekableByteChannel truncate(long size) {
throw new UnsupportedOperationException();
}

@Override
public boolean isOpen() {
return readChannel.isOpen();
Expand All @@ -98,7 +149,11 @@ public boolean isOpen() {
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(readChannel::close);
}
});
};
if (currentOffset > 0 || start > 0) {
adaptedChannel.position(Math.addExact(start, currentOffset));
}
return Channels.newInputStream(adaptedChannel);
} catch (StorageException e) {
throw addSuppressedExceptions(e);
}
Expand Down Expand Up @@ -147,7 +202,7 @@ private void reopenStreamOrFail(StorageException e) throws IOException {
throw addSuppressedExceptions(e);
}
logger.debug(new ParameterizedMessage("failed reading [{}] at offset [{}], attempt [{}] of [{}], retrying",
blobId, currentOffset, attempt, MAX_SUPPRESSED_EXCEPTIONS), e);
blobId, currentOffset, attempt, maxRetries), e);
attempt += 1;
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
failures.add(e);
Expand Down
Loading