From c75a2a00069208c68b4ebb124680b1d126731471 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 16 Jan 2020 19:41:03 +0100 Subject: [PATCH 1/8] Add ranged readBlob to S3BlobContainer --- .../repositories/s3/S3BlobContainer.java | 16 +++ .../s3/S3RetryingInputStream.java | 23 +++- .../s3/S3BlobContainerRetriesTests.java | 110 ++++++++++++++++-- .../common/blobstore/fs/FsBlobContainer.java | 2 +- 4 files changed, 137 insertions(+), 14 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 9cb118d60086f..e9f77c28f4255 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.collect.Tuple; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -86,6 +87,21 @@ public InputStream readBlob(String blobName) throws IOException { return new S3RetryingInputStream(blobStore, buildKey(blobName)); } + @Override + public InputStream readBlob(String blobName, long position, int length) throws IOException { + if (position < 0L) { + throw new IllegalArgumentException("position must be non-negative"); + } + if (length < 0) { + throw new IllegalArgumentException("length must be non-negative"); + } + if (length == 0) { + return new ByteArrayInputStream(new byte[0]); + } else { + return new S3RetryingInputStream(blobStore, buildKey(blobName), position, Math.addExact(position, length - 1)); + } + } + /** * This implementation ignores the failIfAlreadyExists flag as the S3 API has no way to enforce this due to its weak consistency model. */ diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index cb3a89316f6d7..7f1dd4f9b174d 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -49,6 +49,8 @@ class S3RetryingInputStream extends InputStream { private final S3BlobStore blobStore; private final String blobKey; + private final long start; + private final long end; private final int maxAttempts; private InputStream currentStream; @@ -58,17 +60,32 @@ class S3RetryingInputStream extends InputStream { private boolean closed; S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException { + this(blobStore, blobKey, 0, Long.MAX_VALUE - 1); + } + + // both start and end are inclusive bounds, following the definition in GetObjectRequest.setRange + S3RetryingInputStream(S3BlobStore blobStore, String blobKey, long start, long end) throws IOException { + if (start < 0L) { + throw new IllegalArgumentException("start must be non-negative"); + } + if (end < 0L || end == Long.MAX_VALUE || end < start) { + throw new IllegalArgumentException("end must be non-negative and not Long.MAX_VALUE and >= start"); + } this.blobStore = blobStore; this.blobKey = blobKey; this.maxAttempts = blobStore.getMaxRetries() + 1; + this.start = start; + this.end = end; currentStream = openStream(); } private InputStream openStream() throws IOException { try (AmazonS3Reference clientReference = blobStore.clientReference()) { final GetObjectRequest getObjectRequest = new GetObjectRequest(blobStore.bucket(), blobKey); - if (currentOffset > 0) { - getObjectRequest.setRange(currentOffset); + if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) { + assert start + currentOffset <= end : + "requesting beyond end, start = " + start + " offset=" + currentOffset + " end=" + end; + getObjectRequest.setRange(Math.addExact(start, currentOffset), end); } final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest)); return s3Object.getObjectContent(); @@ -125,7 +142,7 @@ private void reopenStreamOrFail(IOException e) throws IOException { throw addSuppressedExceptions(e); } logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying", - blobStore.bucket(), blobKey, currentOffset, attempt, maxAttempts), e); + blobStore.bucket(), blobKey, start + currentOffset, attempt, maxAttempts), e); attempt += 1; if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) { failures.add(e); diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 7060082ffcdfa..8af6cfa386e88 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; @@ -58,6 +59,7 @@ import java.util.Arrays; import java.util.Locale; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -71,9 +73,11 @@ import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; /** * This class tests how a {@link S3BlobContainer} and its underlying AWS S3 client are retrying requests when reading or writing blobs. @@ -139,8 +143,12 @@ private BlobContainer createBlobContainer(final @Nullable Integer maxRetries, public void testReadNonexistentBlobThrowsNoSuchFileException() { final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null); - final Exception exception = expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob("read_nonexistent_blob")); + Exception exception = expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob("read_nonexistent_blob")); assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("blob object [read_nonexistent_blob] not found")); + final long position = randomLongBetween(0, Long.MAX_VALUE - 1); + final int length = randomIntBetween(0, Math.toIntExact(Math.min(Integer.MAX_VALUE, Long.MAX_VALUE - 1 - position))); + assertThat(expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob("read_nonexistent_blob", position, length)) + .getMessage().toLowerCase(Locale.ROOT), containsString("blob object [read_nonexistent_blob] not found")); } public void testReadBlobWithRetries() throws Exception { @@ -153,6 +161,7 @@ public void testReadBlobWithRetries() throws Exception { if (countDown.countDown()) { final int rangeStart = getRangeStart(exchange); assertThat(rangeStart, lessThan(bytes.length)); + assertEquals(Optional.empty(), getRangeEnd(exchange)); exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8"); exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart); exchange.getResponseBody().write(bytes, rangeStart, bytes.length - rangeStart); @@ -178,6 +187,50 @@ public void testReadBlobWithRetries() throws Exception { } } + public void testReadRangeBlobWithRetries() throws Exception { + final int maxRetries = randomInt(5); + final CountDown countDown = new CountDown(maxRetries + 1); + + final byte[] bytes = randomBlobContent(); + httpServer.createContext("/bucket/read_blob_max_retries", exchange -> { + Streams.readFully(exchange.getRequestBody()); + if (countDown.countDown()) { + final int rangeStart = getRangeStart(exchange); + assertThat(rangeStart, lessThan(bytes.length)); + assertTrue(getRangeEnd(exchange).isPresent()); + final int rangeEnd = getRangeEnd(exchange).get(); + assertThat(rangeEnd, greaterThanOrEqualTo(rangeStart)); + // adapt range end to be compliant to https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 + final int effectiveRangeEnd = Math.min(bytes.length - 1, rangeEnd); + final int length = (effectiveRangeEnd - rangeStart) + 1; + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8"); + exchange.sendResponseHeaders(HttpStatus.SC_OK, length); + exchange.getResponseBody().write(bytes, rangeStart, length); + exchange.close(); + return; + } + if (randomBoolean()) { + exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY, + HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1); + } else if (randomBoolean()) { + sendIncompleteContent(exchange, bytes); + } + if (randomBoolean()) { + exchange.close(); + } + }); + + final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 500)); + final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null); + final int position = randomIntBetween(0, bytes.length - 1); + final int length = randomIntBetween(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE); + try (InputStream inputStream = blobContainer.readBlob("read_blob_max_retries", position, length)) { + assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + length)), + BytesReference.toBytes(Streams.readFully(inputStream))); + assertThat(countDown.isCountedDown(), is(length > 0)); + } + } + public void testReadBlobWithReadTimeouts() { final int maxRetries = randomInt(5); final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200)); @@ -194,8 +247,12 @@ public void testReadBlobWithReadTimeouts() { final byte[] bytes = randomBlobContent(); httpServer.createContext("/bucket/read_blob_incomplete", exchange -> sendIncompleteContent(exchange, bytes)); + final int position = randomIntBetween(0, bytes.length - 1); + final int length = randomIntBetween(1, randomBoolean() ? bytes.length : Integer.MAX_VALUE); exception = expectThrows(SocketTimeoutException.class, () -> { - try (InputStream stream = blobContainer.readBlob("read_blob_incomplete")) { + try (InputStream stream = randomBoolean() ? + blobContainer.readBlob("read_blob_incomplete") : + blobContainer.readBlob("read_blob_incomplete", position, length)) { Streams.readFully(stream); } }); @@ -209,7 +266,14 @@ public void testReadBlobWithNoHttpResponse() { // HTTP server closes connection immediately httpServer.createContext("/bucket/read_blob_no_response", HttpExchange::close); - Exception exception = expectThrows(SdkClientException.class, () -> blobContainer.readBlob("read_blob_no_response")); + Exception exception = expectThrows(SdkClientException.class, + () -> { + if (randomBoolean()) { + blobContainer.readBlob("read_blob_no_response"); + } else { + blobContainer.readBlob("read_blob_no_response", 0, 1); + } + }); assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("the target server failed to respond")); assertThat(exception.getCause(), instanceOf(NoHttpResponseException.class)); assertThat(exception.getSuppressed().length, equalTo(0)); @@ -227,7 +291,9 @@ public void testReadBlobWithPrematureConnectionClose() { }); final Exception exception = expectThrows(ConnectionClosedException.class, () -> { - try (InputStream stream = blobContainer.readBlob("read_blob_incomplete")) { + try (InputStream stream = randomBoolean() ? + blobContainer.readBlob("read_blob_incomplete") : + blobContainer.readBlob("read_blob_no_response", 0, 1)) { Streams.readFully(stream); } }); @@ -397,23 +463,47 @@ private static byte[] randomBlobContent() { return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb } - private static int getRangeStart(HttpExchange exchange) { + private static Tuple getRange(HttpExchange exchange) { final String rangeHeader = exchange.getRequestHeaders().getFirst("Range"); if (rangeHeader == null) { - return 0; + return Tuple.tuple(0L, Long.MAX_VALUE - 1); } - final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-9223372036854775806$").matcher(rangeHeader); + final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(rangeHeader); assertTrue(rangeHeader + " matches expected pattern", matcher.matches()); - return Math.toIntExact(Long.parseLong(matcher.group(1))); + long rangeStart = Long.parseLong(matcher.group(1)); + long rangeEnd = Long.parseLong(matcher.group(2)); + assertThat(rangeStart, lessThanOrEqualTo(rangeEnd)); + return Tuple.tuple(rangeStart, rangeEnd); + } + + private static int getRangeStart(HttpExchange exchange) { + return Math.toIntExact(getRange(exchange).v1()); + } + + private static Optional getRangeEnd(HttpExchange exchange) { + final long rangeEnd = getRange(exchange).v2(); + if (rangeEnd == Long.MAX_VALUE - 1) { + return Optional.empty(); + } + return Optional.of(Math.toIntExact(rangeEnd)); } private static void sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws IOException { final int rangeStart = getRangeStart(exchange); assertThat(rangeStart, lessThan(bytes.length)); + final Optional rangeEnd = getRangeEnd(exchange); + final int length; + if (rangeEnd.isPresent()) { + // adapt range end to be compliant to https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 + final int effectiveRangeEnd = Math.min(rangeEnd.get(), bytes.length - 1); + length = effectiveRangeEnd - rangeStart; + } else { + length = bytes.length - rangeStart - 1; + } exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8"); - exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart); - final int bytesToSend = randomIntBetween(0, bytes.length - rangeStart - 1); + exchange.sendResponseHeaders(HttpStatus.SC_OK, length); + final int bytesToSend = randomIntBetween(0, length - 1); if (bytesToSend > 0) { exchange.getResponseBody().write(bytes, rangeStart, bytesToSend); } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index d2ef57caacdf1..5e6e8c0b2f026 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -154,7 +154,7 @@ public InputStream readBlob(String name) throws IOException { @Override public InputStream readBlob(String blobName, long position, int length) throws IOException { - final InputStream inputStream = readBlob(blobName); + final InputStream inputStream = readBlob(blobName); // TODO: limit the consumable size of the input stream here to length long skipped = inputStream.skip(position); // NORELEASE assert skipped == position; return inputStream; From 1ea74062339fdfcf972ed07fe4e686f364afefce Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 17 Jan 2020 11:52:28 +0100 Subject: [PATCH 2/8] Simple review comments --- .../s3/S3RetryingInputStream.java | 4 +-- .../s3/S3BlobContainerRetriesTests.java | 25 +++++++++++++------ 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index 7f1dd4f9b174d..3a3196aa16c0e 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -68,8 +68,8 @@ class S3RetryingInputStream extends InputStream { if (start < 0L) { throw new IllegalArgumentException("start must be non-negative"); } - if (end < 0L || end == Long.MAX_VALUE || end < start) { - throw new IllegalArgumentException("end must be non-negative and not Long.MAX_VALUE and >= start"); + if (end < start || end == Long.MAX_VALUE) { + throw new IllegalArgumentException("end must be >= start and not Long.MAX_VALUE"); } this.blobStore = blobStore; this.blobKey = blobKey; diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 8af6cfa386e88..0465cdffaad3a 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -85,6 +85,8 @@ @SuppressForbidden(reason = "use a http server") public class S3BlobContainerRetriesTests extends ESTestCase { + private static final long MAX_RANGE_VAL = Long.MAX_VALUE - 1; + private HttpServer httpServer; private S3Service service; @@ -143,10 +145,17 @@ private BlobContainer createBlobContainer(final @Nullable Integer maxRetries, public void testReadNonexistentBlobThrowsNoSuchFileException() { final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null); - Exception exception = expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob("read_nonexistent_blob")); + final long position = randomLongBetween(0, MAX_RANGE_VAL); + final int length = randomIntBetween(0, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position))); + final Exception exception = expectThrows(NoSuchFileException.class, + () -> { + if (randomBoolean()) { + blobContainer.readBlob("read_nonexistent_blob"); + } else { + blobContainer.readBlob("read_nonexistent_blob", 0, 1); + } + }); assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("blob object [read_nonexistent_blob] not found")); - final long position = randomLongBetween(0, Long.MAX_VALUE - 1); - final int length = randomIntBetween(0, Math.toIntExact(Math.min(Integer.MAX_VALUE, Long.MAX_VALUE - 1 - position))); assertThat(expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob("read_nonexistent_blob", position, length)) .getMessage().toLowerCase(Locale.ROOT), containsString("blob object [read_nonexistent_blob] not found")); } @@ -192,7 +201,7 @@ public void testReadRangeBlobWithRetries() throws Exception { final CountDown countDown = new CountDown(maxRetries + 1); final byte[] bytes = randomBlobContent(); - httpServer.createContext("/bucket/read_blob_max_retries", exchange -> { + httpServer.createContext("/bucket/read_range_blob_max_retries", exchange -> { Streams.readFully(exchange.getRequestBody()); if (countDown.countDown()) { final int rangeStart = getRangeStart(exchange); @@ -224,7 +233,7 @@ public void testReadRangeBlobWithRetries() throws Exception { final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null); final int position = randomIntBetween(0, bytes.length - 1); final int length = randomIntBetween(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE); - try (InputStream inputStream = blobContainer.readBlob("read_blob_max_retries", position, length)) { + try (InputStream inputStream = blobContainer.readBlob("read_range_blob_max_retries", position, length)) { assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + length)), BytesReference.toBytes(Streams.readFully(inputStream))); assertThat(countDown.isCountedDown(), is(length > 0)); @@ -293,7 +302,7 @@ public void testReadBlobWithPrematureConnectionClose() { final Exception exception = expectThrows(ConnectionClosedException.class, () -> { try (InputStream stream = randomBoolean() ? blobContainer.readBlob("read_blob_incomplete") : - blobContainer.readBlob("read_blob_no_response", 0, 1)) { + blobContainer.readBlob("read_blob_incomplete", 0, 1)) { Streams.readFully(stream); } }); @@ -466,7 +475,7 @@ private static byte[] randomBlobContent() { private static Tuple getRange(HttpExchange exchange) { final String rangeHeader = exchange.getRequestHeaders().getFirst("Range"); if (rangeHeader == null) { - return Tuple.tuple(0L, Long.MAX_VALUE - 1); + return Tuple.tuple(0L, MAX_RANGE_VAL); } final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(rangeHeader); @@ -483,7 +492,7 @@ private static int getRangeStart(HttpExchange exchange) { private static Optional getRangeEnd(HttpExchange exchange) { final long rangeEnd = getRange(exchange).v2(); - if (rangeEnd == Long.MAX_VALUE - 1) { + if (rangeEnd == MAX_RANGE_VAL) { return Optional.empty(); } return Optional.of(Math.toIntExact(rangeEnd)); From ff4361f8f7b6d85631b92c7fbafae679989e265b Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 17 Jan 2020 15:41:49 +0100 Subject: [PATCH 3/8] Consume stream when not fully consumed yet --- .../s3/S3RetryingInputStream.java | 14 +++ .../s3/S3BlobContainerRetriesTests.java | 49 ++++++++- .../common/blobstore/fs/FsBlobContainer.java | 4 +- .../org/elasticsearch/common/io/Streams.java | 103 ++++++++++++++++++ .../elasticsearch/common/io/StreamsTests.java | 16 +++ 5 files changed, 179 insertions(+), 7 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index 3a3196aa16c0e..5768d135daef4 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; @@ -139,6 +140,8 @@ private void ensureOpen() { private void reopenStreamOrFail(IOException e) throws IOException { if (attempt >= maxAttempts) { + logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], giving up", + blobStore.bucket(), blobKey, start + currentOffset, attempt, maxAttempts), e); throw addSuppressedExceptions(e); } logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying", @@ -147,12 +150,23 @@ private void reopenStreamOrFail(IOException e) throws IOException { if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) { failures.add(e); } + try { + Streams.consumeFully(currentStream); + } catch (Exception e2) { + e2.addSuppressed(e); + logger.trace("Failed to fully consume stream on close", e); + } IOUtils.closeWhileHandlingException(currentStream); currentStream = openStream(); } @Override public void close() throws IOException { + try { + Streams.consumeFully(currentStream); + } catch (Exception e) { + logger.trace("Failed to fully consume stream on close", e); + } currentStream.close(); closed = true; } diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 0465cdffaad3a..d327af5713ffc 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -191,8 +191,28 @@ public void testReadBlobWithRetries() throws Exception { final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 500)); final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null); try (InputStream inputStream = blobContainer.readBlob("read_blob_max_retries")) { - assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream))); - assertThat(countDown.isCountedDown(), is(true)); + final int readLimit; + final InputStream wrappedStream; + if (randomBoolean()) { + // read stream only partly + readLimit = randomIntBetween(0, bytes.length); + wrappedStream = Streams.limitStream(inputStream, readLimit); + } else { + readLimit = bytes.length; + wrappedStream = inputStream; + } + final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(wrappedStream)); + logger.info("maxRetries={}, readLimit={}, byteSize={}, bytesRead={}", + maxRetries, readLimit, bytes.length, bytesRead.length); + assertArrayEquals(Arrays.copyOfRange(bytes, 0, readLimit), bytesRead); + if (readLimit == 0) { + // we will not reach out to the service + assertFalse(countDown.isCountedDown()); + } else if (readLimit < bytes.length) { + // we might have completed things based on an incomplete response, and we're happy with that + } else { + assertTrue(countDown.isCountedDown()); + } } } @@ -234,9 +254,28 @@ public void testReadRangeBlobWithRetries() throws Exception { final int position = randomIntBetween(0, bytes.length - 1); final int length = randomIntBetween(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE); try (InputStream inputStream = blobContainer.readBlob("read_range_blob_max_retries", position, length)) { - assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + length)), - BytesReference.toBytes(Streams.readFully(inputStream))); - assertThat(countDown.isCountedDown(), is(length > 0)); + final int readLimit; + final InputStream wrappedStream; + if (randomBoolean()) { + // read stream only partly + readLimit = randomIntBetween(0, length); + wrappedStream = Streams.limitStream(inputStream, readLimit); + } else { + readLimit = length; + wrappedStream = inputStream; + } + final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(wrappedStream)); + logger.info("maxRetries={}, position={}, length={}, readLimit={}, byteSize={}, bytesRead={}", + maxRetries, position, length, readLimit, bytes.length, bytesRead.length); + assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + readLimit)), bytesRead); + if (readLimit == 0) { + // we will not reach out to the service + assertFalse(countDown.isCountedDown()); + } else if (readLimit < length && readLimit == bytesRead.length) { + // we might have completed things based on an incomplete response, and we're happy with that + } else { + assertTrue(countDown.isCountedDown()); + } } } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index 5e6e8c0b2f026..14d6b98321f2e 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -154,10 +154,10 @@ public InputStream readBlob(String name) throws IOException { @Override public InputStream readBlob(String blobName, long position, int length) throws IOException { - final InputStream inputStream = readBlob(blobName); // TODO: limit the consumable size of the input stream here to length + final InputStream inputStream = readBlob(blobName); long skipped = inputStream.skip(position); // NORELEASE assert skipped == position; - return inputStream; + return org.elasticsearch.common.io.Streams.limitStream(inputStream, length); } @Override diff --git a/server/src/main/java/org/elasticsearch/common/io/Streams.java b/server/src/main/java/org/elasticsearch/common/io/Streams.java index 222f94e65ef6a..be2c7b4d1674a 100644 --- a/server/src/main/java/org/elasticsearch/common/io/Streams.java +++ b/server/src/main/java/org/elasticsearch/common/io/Streams.java @@ -205,6 +205,13 @@ public static int readFully(InputStream reader, byte[] dest, int offset, int len return read; } + /** + * Fully consumes the input stream, throwing the bytes away. Returns the number of bytes consumed. + */ + public static long consumeFully(InputStream inputStream) throws IOException { + return copy(inputStream, new NullOutputStream()); + } + public static List readAllLines(InputStream input) throws IOException { final List lines = new ArrayList<>(); readAllLines(input, lines::add); @@ -254,6 +261,13 @@ public static BytesReference readFully(InputStream in) throws IOException { } } + /** + * Limits the given input stream to the provided number of bytes + */ + public static InputStream limitStream(InputStream in, long limit) { + return new LimitedInputStream(in, limit); + } + /** * A wrapper around a {@link BytesStream} that makes the close operation a flush. This is * needed as sometimes a stream will be closed but the bytes that the stream holds still need @@ -297,4 +311,93 @@ public BytesReference bytes() { return delegate.bytes(); } } + + /** + * A wrapper around an {@link InputStream} that limits the number of bytes that can be read from the stream. + */ + static class LimitedInputStream extends FilterInputStream { + + private static final long NO_MARK = -1L; + + private long currentLimit; // is always non-negative + private long limitOnLastMark; + + LimitedInputStream(InputStream in, long limit) { + super(in); + if (limit < 0L) { + throw new IllegalArgumentException("limit must be non-negative"); + } + this.currentLimit = limit; + this.limitOnLastMark = NO_MARK; + } + + @Override + public int read() throws IOException { + final int result; + if (currentLimit == 0 || (result = in.read()) == -1) { + return -1; + } else { + currentLimit--; + return result; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + final int result; + if (currentLimit == 0 || (result = in.read(b, off, Math.toIntExact(Math.min(len, currentLimit)))) == -1) { + return -1; + } else { + currentLimit -= result; + return result; + } + } + + @Override + public long skip(long n) throws IOException { + final long skipped = in.skip(Math.min(n, currentLimit)); + currentLimit -= skipped; + return skipped; + } + + @Override + public int available() throws IOException { + return Math.toIntExact(Math.min(in.available(), currentLimit)); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public synchronized void mark(int readlimit) { + in.mark(readlimit); + limitOnLastMark = currentLimit; + } + + @Override + public synchronized void reset() throws IOException { + in.reset(); + if (limitOnLastMark != NO_MARK) { + currentLimit = limitOnLastMark; + } + } + } + + /** + * OutputStream that just throws all the bytes away + */ + static class NullOutputStream extends OutputStream { + + @Override + public void write(int b) { + + } + + @Override + public void write(byte[] b, int off, int len) { + + } + } } diff --git a/server/src/test/java/org/elasticsearch/common/io/StreamsTests.java b/server/src/test/java/org/elasticsearch/common/io/StreamsTests.java index ee1933e3a1043..30c8a9c6e499e 100644 --- a/server/src/test/java/org/elasticsearch/common/io/StreamsTests.java +++ b/server/src/test/java/org/elasticsearch/common/io/StreamsTests.java @@ -79,4 +79,20 @@ public void testBytesStreamInput() throws IOException { assertEquals(-1, input.read()); input.close(); } + + public void testFullyConsumeInputStream() throws IOException { + final String bytes = randomAlphaOfLengthBetween(0, 100); + final BytesArray stuffArray = new BytesArray(bytes); + assertEquals(bytes.length(), Streams.consumeFully(stuffArray.streamInput())); + } + + public void testLimitInputStream() throws IOException { + final byte[] bytes = randomAlphaOfLengthBetween(1, 100).getBytes(StandardCharsets.UTF_8); + final int limit = randomIntBetween(0, bytes.length); + final BytesArray stuffArray = new BytesArray(bytes); + final ByteArrayOutputStream out = new ByteArrayOutputStream(bytes.length); + final long count = Streams.copy(Streams.limitStream(stuffArray.streamInput(), limit), out); + assertEquals(limit, count); + assertThat(Arrays.equals(out.toByteArray(), Arrays.copyOf(bytes, limit)), equalTo(true)); + } } From 6ec202c332927d1f3cfaf1053102496fc84cc5d3 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 17 Jan 2020 18:24:50 +0100 Subject: [PATCH 4/8] fix test --- .../repositories/s3/S3BlobContainerRetriesTests.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index d327af5713ffc..bb23497617abb 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -338,15 +338,18 @@ public void testReadBlobWithPrematureConnectionClose() { exchange.close(); }); + final boolean useRangeRequest = randomBoolean(); final Exception exception = expectThrows(ConnectionClosedException.class, () -> { - try (InputStream stream = randomBoolean() ? - blobContainer.readBlob("read_blob_incomplete") : - blobContainer.readBlob("read_blob_incomplete", 0, 1)) { + try (InputStream stream = useRangeRequest ? + blobContainer.readBlob("read_blob_incomplete", 0, 1): + blobContainer.readBlob("read_blob_incomplete")) { Streams.readFully(stream); } }); assertThat(exception.getMessage().toLowerCase(Locale.ROOT), - containsString("premature end of content-length delimited message body")); + containsString(useRangeRequest ? + "premature end of chunk coded message body: closing chunk expected" : + "premature end of content-length delimited message body")); assertThat(exception.getSuppressed().length, equalTo(Math.min(S3RetryingInputStream.MAX_SUPPRESSED_EXCEPTIONS, maxRetries))); } From 0b733f010e737199194f8bdbb90138ecc09ca33a Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 17 Jan 2020 18:25:47 +0100 Subject: [PATCH 5/8] no need for suppressed exception, already logged --- .../org/elasticsearch/repositories/s3/S3RetryingInputStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index 5768d135daef4..105f469c905b1 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -153,7 +153,6 @@ private void reopenStreamOrFail(IOException e) throws IOException { try { Streams.consumeFully(currentStream); } catch (Exception e2) { - e2.addSuppressed(e); logger.trace("Failed to fully consume stream on close", e); } IOUtils.closeWhileHandlingException(currentStream); From 65b532d554b5a8e4f2790802a1d1efbaf85c22dc Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 20 Jan 2020 21:36:49 +0100 Subject: [PATCH 6/8] fix tests --- .../s3/S3BlobContainerRetriesTests.java | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index bb23497617abb..bef4e7b78ea14 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -72,6 +72,7 @@ import static org.elasticsearch.repositories.s3.S3ClientSettings.READ_TIMEOUT_SETTING; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; @@ -205,10 +206,7 @@ public void testReadBlobWithRetries() throws Exception { logger.info("maxRetries={}, readLimit={}, byteSize={}, bytesRead={}", maxRetries, readLimit, bytes.length, bytesRead.length); assertArrayEquals(Arrays.copyOfRange(bytes, 0, readLimit), bytesRead); - if (readLimit == 0) { - // we will not reach out to the service - assertFalse(countDown.isCountedDown()); - } else if (readLimit < bytes.length) { + if (readLimit < bytes.length) { // we might have completed things based on an incomplete response, and we're happy with that } else { assertTrue(countDown.isCountedDown()); @@ -268,10 +266,7 @@ public void testReadRangeBlobWithRetries() throws Exception { logger.info("maxRetries={}, position={}, length={}, readLimit={}, byteSize={}, bytesRead={}", maxRetries, position, length, readLimit, bytes.length, bytesRead.length); assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + readLimit)), bytesRead); - if (readLimit == 0) { - // we will not reach out to the service - assertFalse(countDown.isCountedDown()); - } else if (readLimit < length && readLimit == bytesRead.length) { + if (readLimit < length && readLimit == bytesRead.length) { // we might have completed things based on an incomplete response, and we're happy with that } else { assertTrue(countDown.isCountedDown()); @@ -297,14 +292,16 @@ public void testReadBlobWithReadTimeouts() { final int position = randomIntBetween(0, bytes.length - 1); final int length = randomIntBetween(1, randomBoolean() ? bytes.length : Integer.MAX_VALUE); - exception = expectThrows(SocketTimeoutException.class, () -> { + exception = expectThrows(IOException.class, () -> { try (InputStream stream = randomBoolean() ? blobContainer.readBlob("read_blob_incomplete") : blobContainer.readBlob("read_blob_incomplete", position, length)) { Streams.readFully(stream); } }); - assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out")); + assertThat(exception, either(instanceOf(SocketTimeoutException.class)).or(instanceOf(ConnectionClosedException.class))); + assertThat(exception.getMessage().toLowerCase(Locale.ROOT), either(containsString("read timed out")).or( + containsString("Premature end of chunk coded message body: closing chunk expected"))); assertThat(exception.getSuppressed().length, equalTo(maxRetries)); } From 4ee2d9ffe1f0eeb44bfd1be9e1cd6dcd78331131 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 20 Jan 2020 21:56:57 +0100 Subject: [PATCH 7/8] moar fixes --- .../repositories/s3/S3BlobContainerRetriesTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index bef4e7b78ea14..d89ba7b692725 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -266,7 +266,7 @@ public void testReadRangeBlobWithRetries() throws Exception { logger.info("maxRetries={}, position={}, length={}, readLimit={}, byteSize={}, bytesRead={}", maxRetries, position, length, readLimit, bytes.length, bytesRead.length); assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + readLimit)), bytesRead); - if (readLimit < length && readLimit == bytesRead.length) { + if (readLimit == 0 || (readLimit < length && readLimit == bytesRead.length)) { // we might have completed things based on an incomplete response, and we're happy with that } else { assertTrue(countDown.isCountedDown()); @@ -301,7 +301,7 @@ public void testReadBlobWithReadTimeouts() { }); assertThat(exception, either(instanceOf(SocketTimeoutException.class)).or(instanceOf(ConnectionClosedException.class))); assertThat(exception.getMessage().toLowerCase(Locale.ROOT), either(containsString("read timed out")).or( - containsString("Premature end of chunk coded message body: closing chunk expected"))); + containsString("premature end of chunk coded message body: closing chunk expected"))); assertThat(exception.getSuppressed().length, equalTo(maxRetries)); } From cf193742cff6b9dabace211b559e7ac48a6c61ad Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 20 Jan 2020 23:07:08 +0100 Subject: [PATCH 8/8] yet more fixes --- .../repositories/s3/S3BlobContainerRetriesTests.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index d89ba7b692725..b0fe29a4217a7 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -335,18 +335,16 @@ public void testReadBlobWithPrematureConnectionClose() { exchange.close(); }); - final boolean useRangeRequest = randomBoolean(); final Exception exception = expectThrows(ConnectionClosedException.class, () -> { - try (InputStream stream = useRangeRequest ? + try (InputStream stream = randomBoolean() ? blobContainer.readBlob("read_blob_incomplete", 0, 1): blobContainer.readBlob("read_blob_incomplete")) { Streams.readFully(stream); } }); assertThat(exception.getMessage().toLowerCase(Locale.ROOT), - containsString(useRangeRequest ? - "premature end of chunk coded message body: closing chunk expected" : - "premature end of content-length delimited message body")); + either(containsString("premature end of chunk coded message body: closing chunk expected")) + .or(containsString("premature end of content-length delimited message body"))); assertThat(exception.getSuppressed().length, equalTo(Math.min(S3RetryingInputStream.MAX_SUPPRESSED_EXCEPTIONS, maxRetries))); }