Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.Tuple;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
Expand Down Expand Up @@ -86,6 +87,21 @@ public InputStream readBlob(String blobName) throws IOException {
return new S3RetryingInputStream(blobStore, buildKey(blobName));
}

@Override
public InputStream readBlob(String blobName, long position, int length) throws IOException {
if (position < 0L) {
throw new IllegalArgumentException("position must be non-negative");
}
if (length < 0) {
throw new IllegalArgumentException("length must be non-negative");
}
if (length == 0) {
return new ByteArrayInputStream(new byte[0]);
} else {
return new S3RetryingInputStream(blobStore, buildKey(blobName), position, Math.addExact(position, length - 1));
}
}

/**
* This implementation ignores the failIfAlreadyExists flag as the S3 API has no way to enforce this due to its weak consistency model.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class S3RetryingInputStream extends InputStream {

private final S3BlobStore blobStore;
private final String blobKey;
private final long start;
private final long end;
private final int maxAttempts;

private InputStream currentStream;
Expand All @@ -58,17 +60,32 @@ class S3RetryingInputStream extends InputStream {
private boolean closed;

S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException {
this(blobStore, blobKey, 0, Long.MAX_VALUE - 1);
}

// both start and end are inclusive bounds, following the definition in GetObjectRequest.setRange
S3RetryingInputStream(S3BlobStore blobStore, String blobKey, long start, long end) throws IOException {
if (start < 0L) {
throw new IllegalArgumentException("start must be non-negative");
}
if (end < 0L || end == Long.MAX_VALUE || end < start) {
throw new IllegalArgumentException("end must be non-negative and not Long.MAX_VALUE and >= start");
}
this.blobStore = blobStore;
this.blobKey = blobKey;
this.maxAttempts = blobStore.getMaxRetries() + 1;
this.start = start;
this.end = end;
currentStream = openStream();
}

private InputStream openStream() throws IOException {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
final GetObjectRequest getObjectRequest = new GetObjectRequest(blobStore.bucket(), blobKey);
if (currentOffset > 0) {
getObjectRequest.setRange(currentOffset);
if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) {
assert start + currentOffset <= end :
"requesting beyond end, start = " + start + " offset=" + currentOffset + " end=" + end;
getObjectRequest.setRange(Math.addExact(start, currentOffset), end);
}
final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
return s3Object.getObjectContent();
Expand Down Expand Up @@ -125,7 +142,7 @@ private void reopenStreamOrFail(IOException e) throws IOException {
throw addSuppressedExceptions(e);
}
logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying",
blobStore.bucket(), blobKey, currentOffset, attempt, maxAttempts), e);
blobStore.bucket(), blobKey, start + currentOffset, attempt, maxAttempts), e);
attempt += 1;
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
failures.add(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
Expand All @@ -58,6 +59,7 @@
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -71,9 +73,11 @@
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

/**
* This class tests how a {@link S3BlobContainer} and its underlying AWS S3 client are retrying requests when reading or writing blobs.
Expand Down Expand Up @@ -139,8 +143,12 @@ private BlobContainer createBlobContainer(final @Nullable Integer maxRetries,

public void testReadNonexistentBlobThrowsNoSuchFileException() {
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null);
final Exception exception = expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob("read_nonexistent_blob"));
Exception exception = expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob("read_nonexistent_blob"));
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("blob object [read_nonexistent_blob] not found"));
final long position = randomLongBetween(0, Long.MAX_VALUE - 1);
final int length = randomIntBetween(0, Math.toIntExact(Math.min(Integer.MAX_VALUE, Long.MAX_VALUE - 1 - position)));
assertThat(expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob("read_nonexistent_blob", position, length))
.getMessage().toLowerCase(Locale.ROOT), containsString("blob object [read_nonexistent_blob] not found"));
}

public void testReadBlobWithRetries() throws Exception {
Expand All @@ -153,6 +161,7 @@ public void testReadBlobWithRetries() throws Exception {
if (countDown.countDown()) {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
assertEquals(Optional.empty(), getRangeEnd(exchange));
exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart);
exchange.getResponseBody().write(bytes, rangeStart, bytes.length - rangeStart);
Expand All @@ -178,6 +187,50 @@ public void testReadBlobWithRetries() throws Exception {
}
}

public void testReadRangeBlobWithRetries() throws Exception {
final int maxRetries = randomInt(5);
final CountDown countDown = new CountDown(maxRetries + 1);

final byte[] bytes = randomBlobContent();
httpServer.createContext("/bucket/read_blob_max_retries", exchange -> {
Streams.readFully(exchange.getRequestBody());
if (countDown.countDown()) {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
assertTrue(getRangeEnd(exchange).isPresent());
final int rangeEnd = getRangeEnd(exchange).get();
assertThat(rangeEnd, greaterThanOrEqualTo(rangeStart));
// adapt range end to be compliant to https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
final int effectiveRangeEnd = Math.min(bytes.length - 1, rangeEnd);
final int length = (effectiveRangeEnd - rangeStart) + 1;
exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
exchange.sendResponseHeaders(HttpStatus.SC_OK, length);
exchange.getResponseBody().write(bytes, rangeStart, length);
exchange.close();
return;
}
if (randomBoolean()) {
exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1);
} else if (randomBoolean()) {
sendIncompleteContent(exchange, bytes);
}
if (randomBoolean()) {
exchange.close();
}
});

final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 500));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
final int position = randomIntBetween(0, bytes.length - 1);
final int length = randomIntBetween(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE);
try (InputStream inputStream = blobContainer.readBlob("read_blob_max_retries", position, length)) {
assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + length)),
BytesReference.toBytes(Streams.readFully(inputStream)));
assertThat(countDown.isCountedDown(), is(length > 0));
}
}

public void testReadBlobWithReadTimeouts() {
final int maxRetries = randomInt(5);
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
Expand All @@ -194,8 +247,12 @@ public void testReadBlobWithReadTimeouts() {
final byte[] bytes = randomBlobContent();
httpServer.createContext("/bucket/read_blob_incomplete", exchange -> sendIncompleteContent(exchange, bytes));

final int position = randomIntBetween(0, bytes.length - 1);
final int length = randomIntBetween(1, randomBoolean() ? bytes.length : Integer.MAX_VALUE);
exception = expectThrows(SocketTimeoutException.class, () -> {
try (InputStream stream = blobContainer.readBlob("read_blob_incomplete")) {
try (InputStream stream = randomBoolean() ?
blobContainer.readBlob("read_blob_incomplete") :
blobContainer.readBlob("read_blob_incomplete", position, length)) {
Streams.readFully(stream);
}
});
Expand All @@ -209,7 +266,14 @@ public void testReadBlobWithNoHttpResponse() {
// HTTP server closes connection immediately
httpServer.createContext("/bucket/read_blob_no_response", HttpExchange::close);

Exception exception = expectThrows(SdkClientException.class, () -> blobContainer.readBlob("read_blob_no_response"));
Exception exception = expectThrows(SdkClientException.class,
() -> {
if (randomBoolean()) {
blobContainer.readBlob("read_blob_no_response");
} else {
blobContainer.readBlob("read_blob_no_response", 0, 1);
}
});
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("the target server failed to respond"));
assertThat(exception.getCause(), instanceOf(NoHttpResponseException.class));
assertThat(exception.getSuppressed().length, equalTo(0));
Expand All @@ -227,7 +291,9 @@ public void testReadBlobWithPrematureConnectionClose() {
});

final Exception exception = expectThrows(ConnectionClosedException.class, () -> {
try (InputStream stream = blobContainer.readBlob("read_blob_incomplete")) {
try (InputStream stream = randomBoolean() ?
blobContainer.readBlob("read_blob_incomplete") :
blobContainer.readBlob("read_blob_no_response", 0, 1)) {
Streams.readFully(stream);
}
});
Expand Down Expand Up @@ -397,23 +463,47 @@ private static byte[] randomBlobContent() {
return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
}

private static int getRangeStart(HttpExchange exchange) {
private static Tuple<Long, Long> getRange(HttpExchange exchange) {
final String rangeHeader = exchange.getRequestHeaders().getFirst("Range");
if (rangeHeader == null) {
return 0;
return Tuple.tuple(0L, Long.MAX_VALUE - 1);
}

final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-9223372036854775806$").matcher(rangeHeader);
final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(rangeHeader);
assertTrue(rangeHeader + " matches expected pattern", matcher.matches());
return Math.toIntExact(Long.parseLong(matcher.group(1)));
long rangeStart = Long.parseLong(matcher.group(1));
long rangeEnd = Long.parseLong(matcher.group(2));
assertThat(rangeStart, lessThanOrEqualTo(rangeEnd));
return Tuple.tuple(rangeStart, rangeEnd);
}

private static int getRangeStart(HttpExchange exchange) {
return Math.toIntExact(getRange(exchange).v1());
}

private static Optional<Integer> getRangeEnd(HttpExchange exchange) {
final long rangeEnd = getRange(exchange).v2();
if (rangeEnd == Long.MAX_VALUE - 1) {
return Optional.empty();
}
return Optional.of(Math.toIntExact(rangeEnd));
}

private static void sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws IOException {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
final Optional<Integer> rangeEnd = getRangeEnd(exchange);
final int length;
if (rangeEnd.isPresent()) {
// adapt range end to be compliant to https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
final int effectiveRangeEnd = Math.min(rangeEnd.get(), bytes.length - 1);
length = effectiveRangeEnd - rangeStart;
} else {
length = bytes.length - rangeStart - 1;
}
exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart);
final int bytesToSend = randomIntBetween(0, bytes.length - rangeStart - 1);
exchange.sendResponseHeaders(HttpStatus.SC_OK, length);
final int bytesToSend = randomIntBetween(0, length - 1);
if (bytesToSend > 0) {
exchange.getResponseBody().write(bytes, rangeStart, bytesToSend);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public InputStream readBlob(String name) throws IOException {

@Override
public InputStream readBlob(String blobName, long position, int length) throws IOException {
final InputStream inputStream = readBlob(blobName);
final InputStream inputStream = readBlob(blobName); // TODO: limit the consumable size of the input stream here to length
long skipped = inputStream.skip(position); // NORELEASE
assert skipped == position;
return inputStream;
Expand Down