diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java index 849e25d71d98..99ee7a976817 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java @@ -353,6 +353,30 @@ public class AwsProperties implements Serializable { @Deprecated public static final boolean CLIENT_ENABLE_ETAG_CHECK_DEFAULT = false; + /** + * Number of times to retry S3 read operation. + */ + public static final String S3_READ_RETRY_NUM_RETRIES = "s3.read.retry.num-retries"; + public static final int S3_READ_RETRY_NUM_RETRIES_DEFAULT = 7; + + /** + * Minimum wait time to retry a S3 read operation + */ + public static final String S3_READ_RETRY_MIN_WAIT_MS = "s3.read.retry.min-wait-ms"; + public static final long S3_READ_RETRY_MIN_WAIT_MS_DEFAULT = 500; // 0.5 seconds + + /** + * Maximum wait time to retry a S3 read operation + */ + public static final String S3_READ_RETRY_MAX_WAIT_MS = "s3.read.retry.max-wait-ms"; + public static final long S3_READ_RETRY_MAX_WAIT_MS_DEFAULT = 2 * 60 * 1000; // 2 minute + + /** + * Total retry time for a S3 read operation + */ + public static final String S3_READ_RETRY_TOTAL_TIMEOUT_MS = "s3.read.retry.total-timeout-ms"; + public static final long S3_READ_RETRY_TOTAL_TIMEOUT_MS_DEFAULT = 10 * 60 * 1000; // 10 minutes + /** * Used by {@link LakeFormationAwsClientFactory}. * The table name used as part of lake formation credentials request. @@ -380,6 +404,10 @@ public class AwsProperties implements Serializable { private int s3FileIoDeleteThreads; private boolean isS3DeleteEnabled; private final Map s3BucketToAccessPointMapping; + private int s3ReadRetryNumRetries; + private long s3ReadRetryMinWaitMs; + private long s3ReadRetryMaxWaitMs; + private long s3ReadRetryTotalTimeoutMs; private String glueCatalogId; private boolean glueCatalogSkipArchive; @@ -404,6 +432,10 @@ public AwsProperties() { this.s3FileIoDeleteThreads = Runtime.getRuntime().availableProcessors(); this.isS3DeleteEnabled = S3_DELETE_ENABLED_DEFAULT; this.s3BucketToAccessPointMapping = ImmutableMap.of(); + this.s3ReadRetryNumRetries = S3_READ_RETRY_NUM_RETRIES_DEFAULT; + this.s3ReadRetryMinWaitMs = S3_READ_RETRY_MIN_WAIT_MS_DEFAULT; + this.s3ReadRetryMaxWaitMs = S3_READ_RETRY_MAX_WAIT_MS_DEFAULT; + this.s3ReadRetryTotalTimeoutMs = S3_READ_RETRY_TOTAL_TIMEOUT_MS_DEFAULT; this.glueCatalogId = null; this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT; @@ -472,6 +504,14 @@ public AwsProperties(Map properties) { Runtime.getRuntime().availableProcessors()); this.isS3DeleteEnabled = PropertyUtil.propertyAsBoolean(properties, S3_DELETE_ENABLED, S3_DELETE_ENABLED_DEFAULT); this.s3BucketToAccessPointMapping = PropertyUtil.propertiesWithPrefix(properties, S3_ACCESS_POINTS_PREFIX); + this.s3ReadRetryNumRetries = PropertyUtil.propertyAsInt(properties, S3_READ_RETRY_NUM_RETRIES, + S3_READ_RETRY_NUM_RETRIES_DEFAULT); + this.s3ReadRetryMinWaitMs = PropertyUtil.propertyAsLong(properties, S3_READ_RETRY_MIN_WAIT_MS, + S3_READ_RETRY_MIN_WAIT_MS_DEFAULT); + this.s3ReadRetryMaxWaitMs = PropertyUtil.propertyAsLong(properties, S3_READ_RETRY_MAX_WAIT_MS, + S3_READ_RETRY_MAX_WAIT_MS_DEFAULT); + this.s3ReadRetryTotalTimeoutMs = PropertyUtil.propertyAsLong(properties, S3_READ_RETRY_TOTAL_TIMEOUT_MS, + S3_READ_RETRY_TOTAL_TIMEOUT_MS_DEFAULT); this.dynamoDbTableName = PropertyUtil.propertyAsString(properties, DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT); @@ -613,6 +653,38 @@ public void setS3DeleteEnabled(boolean s3DeleteEnabled) { this.isS3DeleteEnabled = s3DeleteEnabled; } + public int s3ReadRetryNumRetries() { + return s3ReadRetryNumRetries; + } + + public void setS3ReadRetryNumRetries(int s3ReadRetryNumRetries) { + this.s3ReadRetryNumRetries = s3ReadRetryNumRetries; + } + + public long s3ReadRetryMinWaitMs() { + return s3ReadRetryMinWaitMs; + } + + public void setS3ReadRetryMinWaitMs(long s3ReadRetryMinWaitMs) { + this.s3ReadRetryMinWaitMs = s3ReadRetryMinWaitMs; + } + + public long s3ReadRetryMaxWaitMs() { + return s3ReadRetryMaxWaitMs; + } + + public void setS3ReadRetryMaxWaitMs(long s3ReadRetryMaxWaitMs) { + this.s3ReadRetryMaxWaitMs = s3ReadRetryMaxWaitMs; + } + + public long s3ReadRetryTotalTimeoutMs() { + return s3ReadRetryTotalTimeoutMs; + } + + public void setS3ReadRetryTotalTimeoutMs(long s3ReadRetryTotalTimeoutMs) { + this.s3ReadRetryTotalTimeoutMs = s3ReadRetryTotalTimeoutMs; + } + private Set toTags(Map properties, String prefix) { return PropertyUtil.propertiesWithPrefix(properties, prefix) .entrySet().stream() diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java index 92d0f3f3eae9..9c6dab8a382b 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java @@ -19,8 +19,11 @@ package org.apache.iceberg.aws.s3; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.HttpURLConnection; import java.util.Arrays; import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.io.FileIOMetricsContext; @@ -33,11 +36,16 @@ import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; +import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.AbortedException; import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.http.Abortable; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Exception; class S3InputStream extends SeekableInputStream implements RangeReadable { private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class); @@ -88,23 +96,69 @@ public void seek(long newPos) { @Override public int read() throws IOException { - Preconditions.checkState(!closed, "Cannot read: already closed"); - positionStream(); + int[] byteRef = new int[1]; + try { + Tasks.foreach(0) + .retry(awsProperties.s3ReadRetryNumRetries()) + .exponentialBackoff( + awsProperties.s3ReadRetryMinWaitMs(), + awsProperties.s3ReadRetryMaxWaitMs(), + awsProperties.s3ReadRetryTotalTimeoutMs(), + 2.0 /* exponential */) + .shouldRetryTest(S3InputStream::shouldRetry) + .throwFailureWhenFinished() + .run(ignored -> { + try { + Preconditions.checkState(!closed, "Cannot read: already closed"); + positionStream(); + + byteRef[0] = stream.read(); + } catch (IOException e) { + closeStream(); + throw new UncheckedIOException(e); + } + }); + } catch (UncheckedIOException e) { + throw e.getCause(); + } pos += 1; next += 1; readBytes.increment(); readOperations.increment(); - return stream.read(); + return byteRef[0]; } @Override public int read(byte[] b, int off, int len) throws IOException { - Preconditions.checkState(!closed, "Cannot read: already closed"); - positionStream(); + int[] bytesReadRef = new int[1]; + try { + Tasks.foreach(0) + .retry(awsProperties.s3ReadRetryNumRetries()) + .exponentialBackoff( + awsProperties.s3ReadRetryMinWaitMs(), + awsProperties.s3ReadRetryMaxWaitMs(), + awsProperties.s3ReadRetryTotalTimeoutMs(), + 2.0 /* exponential */) + .shouldRetryTest(S3InputStream::shouldRetry) + .throwFailureWhenFinished() + .run(ignored -> { + try { + Preconditions.checkState(!closed, "Cannot read: already closed"); + positionStream(); + + bytesReadRef[0] = stream.read(b, off, len); + } catch (IOException e) { + closeStream(); + throw new UncheckedIOException(e); + } + }); + } catch (UncheckedIOException e) { + throw e.getCause(); + } - int bytesRead = stream.read(b, off, len); + int bytesRead = bytesReadRef[0]; pos += bytesRead; next += bytesRead; readBytes.increment((long) bytesRead); @@ -118,8 +172,30 @@ public void readFully(long position, byte[] buffer, int offset, int length) thro Preconditions.checkPositionIndexes(offset, offset + length, buffer.length); String range = String.format("bytes=%s-%s", position, position + length - 1); - - IOUtil.readFully(readRange(range), buffer, offset, length); + try { + Tasks.foreach(0) + .retry(awsProperties.s3ReadRetryNumRetries()) + .exponentialBackoff( + awsProperties.s3ReadRetryMinWaitMs(), + awsProperties.s3ReadRetryMaxWaitMs(), + awsProperties.s3ReadRetryTotalTimeoutMs(), + 2.0 /* exponential */) + .shouldRetryTest(S3InputStream::shouldRetry) + .throwFailureWhenFinished() + .run(ignored -> { + InputStream rangeStream = null; + try { + rangeStream = readRange(range); + IOUtil.readFully(rangeStream, buffer, offset, length); + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + closeServerSideStream(rangeStream); + } + }); + } catch (UncheckedIOException e) { + throw e.getCause(); + } } @Override @@ -127,8 +203,34 @@ public int readTail(byte[] buffer, int offset, int length) throws IOException { Preconditions.checkPositionIndexes(offset, offset + length, buffer.length); String range = String.format("bytes=-%s", length); + int[] bytesReadRef = new int[1]; + + try { + Tasks.foreach(0) + .retry(awsProperties.s3ReadRetryNumRetries()) + .exponentialBackoff( + awsProperties.s3ReadRetryMinWaitMs(), + awsProperties.s3ReadRetryMaxWaitMs(), + awsProperties.s3ReadRetryTotalTimeoutMs(), + 2.0 /* exponential */) + .shouldRetryTest(S3InputStream::shouldRetry) + .throwFailureWhenFinished() + .run(ignored -> { + InputStream rangeStream = null; + try { + rangeStream = readRange(range); + bytesReadRef[0] = IOUtil.readRemaining(rangeStream, buffer, offset, length); + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + closeServerSideStream(rangeStream); + } + }); + } catch (UncheckedIOException e) { + throw e.getCause(); + } - return IOUtil.readRemaining(readRange(range), buffer, offset, length); + return bytesReadRef[0]; } private InputStream readRange(String range) { @@ -172,31 +274,62 @@ private void positionStream() throws IOException { } // close the stream and open at desired position - LOG.debug("Seek with new stream for {} to offset {}", location, next); + LOG.warn("Seek with new stream for {} to offset {}", location, next); pos = next; openStream(); } - private void openStream() throws IOException { - GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder() - .bucket(location.bucket()) - .key(location.key()) - .range(String.format("bytes=%s-", pos)); - - S3RequestUtil.configureEncryption(awsProperties, requestBuilder); - + private void openStream() { closeStream(); - stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream()); + stream = readRange(String.format("bytes=%s-", pos)); + } + + private void closeStream() { + closeServerSideStream(stream); + stream = null; } - private void closeStream() throws IOException { - if (stream != null) { - stream.close(); + private static void closeServerSideStream(InputStream streamToClose) { + if (streamToClose != null) { + try { + if (streamToClose instanceof Abortable) { + // Stated in the ResponseInputStream javadoc: + // If it is not desired to read remaining data from the stream, + // you can explicitly abort the connection via abort(). + ((Abortable) streamToClose).abort(); + } else { + streamToClose.close(); + } + } catch (IOException | AbortedException e) { + // ignore failure to abort or close stream + } } } - public void setSkipSize(int skipSize) { - this.skipSize = skipSize; + private static boolean shouldRetry(Exception exception) { + if (exception instanceof UncheckedIOException) { + if (exception.getCause() instanceof EOFException) { + return false; + } + } + + if (exception instanceof AwsServiceException) { + switch (((AwsServiceException) exception).statusCode()) { + case HttpURLConnection.HTTP_FORBIDDEN: + case HttpURLConnection.HTTP_BAD_REQUEST: + return false; + } + } + + if (exception instanceof S3Exception) { + switch (((S3Exception) exception).statusCode()) { + case HttpURLConnection.HTTP_NOT_FOUND: + case 416: // range not satisfied + return false; + } + } + + return true; } @SuppressWarnings("checkstyle:NoFinalizer") diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestFuzzyS3InputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestFuzzyS3InputStream.java new file mode 100644 index 000000000000..0d1744b124f5 --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestFuzzyS3InputStream.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.s3; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.net.SocketTimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import javax.net.ssl.SSLException; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.aws.AwsProperties; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.BucketAlreadyExistsException; +import software.amazon.awssdk.services.s3.model.BucketAlreadyOwnedByYouException; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.CreateBucketResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.InvalidObjectStateException; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +@RunWith(Parameterized.class) +public class TestFuzzyS3InputStream extends TestS3InputStream { + + private final Exception exception; + private final boolean shouldRetry; + + public TestFuzzyS3InputStream(Exception exception, boolean shouldRetry) { + this.exception = exception; + this.shouldRetry = shouldRetry; + } + + @Parameterized.Parameters(name = "exception = {0}, shouldRetry = {1}") + public static Object[][] parameters() { + return new Object[][] { + { new IOException("random failure"), true }, + { new SSLException("client connection reset"), true }, + { new SocketTimeoutException("client connection reset"), true }, + { new EOFException("failure"), false }, + { AwsServiceException.builder().statusCode(403).message("failure").build(), false }, + { AwsServiceException.builder().statusCode(400).message("failure").build(), false }, + { S3Exception.builder().statusCode(404).message("failure").build(), false }, + { S3Exception.builder().statusCode(416).message("failure").build(), false } + }; + } + + @Test + public void testReadWithFuzzyClientRetrySucceed() throws Exception { + Assume.assumeTrue(shouldRetry); + AwsProperties awsProperties = new AwsProperties(); + awsProperties.setS3ReadRetryNumRetries(4); + awsProperties.setS3ReadRetryMinWaitMs(100); + testRead(fuzzyClient(new AtomicInteger(3), exception), awsProperties); + } + + @Test + public void testReadWithFuzzyStreamRetrySucceed() throws Exception { + Assume.assumeTrue(shouldRetry); + AwsProperties awsProperties = new AwsProperties(); + awsProperties.setS3ReadRetryNumRetries(4); + awsProperties.setS3ReadRetryMinWaitMs(100); + testRead(fuzzyStreamClient(new AtomicInteger(3), (IOException) exception), awsProperties); + } + + @Test + public void testReadWithFuzzyClientRetryFail() throws Exception { + Assume.assumeTrue(shouldRetry); + testRetryFail((counter, awsProperties) -> { + try { + testRead(fuzzyClient(counter, exception), awsProperties); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testReadWithFuzzyStreamRetryFail() throws Exception { + Assume.assumeTrue(shouldRetry); + testRetryFail((counter, awsProperties) -> { + try { + testRead(fuzzyStreamClient(counter, (IOException) exception), awsProperties); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testSeekWithFuzzyClientRetrySucceed() throws Exception { + Assume.assumeTrue(shouldRetry); + AwsProperties awsProperties = new AwsProperties(); + awsProperties.setS3ReadRetryNumRetries(4); + awsProperties.setS3ReadRetryMinWaitMs(100); + testSeek(fuzzyClient(new AtomicInteger(3), exception), awsProperties); + } + + @Test + public void testSeekWithFuzzyStreamRetrySucceed() throws Exception { + Assume.assumeTrue(shouldRetry); + AwsProperties awsProperties = new AwsProperties(); + awsProperties.setS3ReadRetryNumRetries(4); + awsProperties.setS3ReadRetryMinWaitMs(100); + testSeek(fuzzyStreamClient(new AtomicInteger(3), (IOException) exception), awsProperties); + } + + @Test + public void testSeekWithFuzzyClientRetryFail() throws Exception { + Assume.assumeTrue(shouldRetry); + testRetryFail((counter, awsProperties) -> { + try { + testSeek(fuzzyClient(counter, exception), awsProperties); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testSeekWithFuzzyStreamRetryFail() throws Exception { + Assume.assumeTrue(shouldRetry); + testRetryFail((counter, awsProperties) -> { + try { + testSeek(fuzzyStreamClient(counter, (IOException) exception), awsProperties); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testRangeReadWithFuzzyClientRetrySucceed() throws Exception { + Assume.assumeTrue(shouldRetry); + AwsProperties awsProperties = new AwsProperties(); + awsProperties.setS3ReadRetryNumRetries(4); + awsProperties.setS3ReadRetryMinWaitMs(100); + testRangeRead(fuzzyClient(new AtomicInteger(3), exception), awsProperties); + } + + @Test + public void testRangeReadWithFuzzyStreamRetrySucceed() throws Exception { + Assume.assumeTrue(shouldRetry); + AwsProperties awsProperties = new AwsProperties(); + awsProperties.setS3ReadRetryNumRetries(4); + awsProperties.setS3ReadRetryMinWaitMs(100); + testRangeRead(fuzzyStreamClient(new AtomicInteger(3), (IOException) exception), awsProperties); + } + + @Test + public void testRangeReadWithFuzzyClientRetryFail() throws Exception { + Assume.assumeTrue(shouldRetry); + testRetryFail((counter, awsProperties) -> { + try { + testRangeRead(fuzzyClient(counter, exception), awsProperties); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testRangeReadWithFuzzyStreamRetryFail() throws Exception { + Assume.assumeTrue(shouldRetry); + testRetryFail((counter, awsProperties) -> { + try { + testRangeRead(fuzzyStreamClient(counter, (IOException) exception), awsProperties); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testExceptionNoRetry() throws Exception { + Assume.assumeFalse(shouldRetry); + testReadExceptionShouldNotRetry(); + } + + private void testRetryFail(BiConsumer runnable) throws Exception { + Assume.assumeTrue(shouldRetry); + AwsProperties awsProperties = new AwsProperties(); + awsProperties.setS3ReadRetryNumRetries(2); + awsProperties.setS3ReadRetryMinWaitMs(100); + AtomicInteger counter = new AtomicInteger(4); + AssertHelpers.assertThrowsCause("Should fail after retries", + exception.getClass(), + exception.getMessage(), + () -> runnable.accept(counter, awsProperties)); + Assert.assertEquals("Should have 3 invocations (1 initial call, 2 retries)", 3, 4 - counter.get()); + } + + private void testReadExceptionShouldNotRetry() throws Exception { + AwsProperties awsProperties = new AwsProperties(); + awsProperties.setS3ReadRetryNumRetries(4); + awsProperties.setS3ReadRetryMinWaitMs(100); + AtomicInteger counter = new AtomicInteger(3); + AssertHelpers.assertThrowsCause("Should fail without retry", + exception.getClass(), + exception.getMessage(), + () -> { + try { + testRead(fuzzyClient(counter, exception), awsProperties); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + Assert.assertEquals("Should have 1 invocation (1 initial call, no retry)", 1, 3 - counter.get()); + } + + private S3Client fuzzyClient(AtomicInteger counter, Exception failure) { + S3Client fuzzyClient = spy(new S3ClientWrapper(s3Client())); + int round = counter.get(); + + // for every round of n invocations, only the last call succeeds + doAnswer(invocation -> { + if (counter.decrementAndGet() == 0) { + counter.set(round); + return invocation.callRealMethod(); + } else { + throw failure; + } + }).when(fuzzyClient).getObject(any(GetObjectRequest.class), any(ResponseTransformer.class)); + return fuzzyClient; + } + + private S3Client fuzzyStreamClient(AtomicInteger counter, IOException failure) { + S3Client fuzzyClient = spy(new S3ClientWrapper(s3Client())); + doAnswer(invocation -> new FuzzyResponseInputStream(invocation.callRealMethod(), counter, failure)) + .when(fuzzyClient).getObject(any(GetObjectRequest.class), any(ResponseTransformer.class)); + return fuzzyClient; + } + + /** + * Wrapper for S3 client, used to mock the final class DefaultS3Client + */ + public static class S3ClientWrapper implements S3Client { + + private final S3Client delegate; + + public S3ClientWrapper(S3Client delegate) { + this.delegate = delegate; + } + + @Override + public String serviceName() { + return delegate.serviceName(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public ReturnT getObject( + GetObjectRequest getObjectRequest, + ResponseTransformer responseTransformer) + throws NoSuchKeyException, InvalidObjectStateException, AwsServiceException, SdkClientException, S3Exception { + return delegate.getObject(getObjectRequest, responseTransformer); + } + + @Override + public HeadObjectResponse headObject( + HeadObjectRequest headObjectRequest) + throws NoSuchKeyException, AwsServiceException, SdkClientException, S3Exception { + return delegate.headObject(headObjectRequest); + } + + @Override + public PutObjectResponse putObject( + PutObjectRequest putObjectRequest, + RequestBody requestBody) + throws AwsServiceException, SdkClientException, S3Exception { + return delegate.putObject(putObjectRequest, requestBody); + } + + @Override + public CreateBucketResponse createBucket( + CreateBucketRequest createBucketRequest) + throws BucketAlreadyExistsException, BucketAlreadyOwnedByYouException, + AwsServiceException, SdkClientException, S3Exception { + return delegate.createBucket(createBucketRequest); + } + } + + public static class FuzzyResponseInputStream extends InputStream { + + private final ResponseInputStream delegate; + private final AtomicInteger counter; + private final int round; + private final IOException exception; + + public FuzzyResponseInputStream(Object invocationResponse, AtomicInteger counter, IOException exception) { + this.delegate = (ResponseInputStream) invocationResponse; + this.counter = counter; + this.round = counter.get(); + this.exception = exception; + } + + private void checkCounter() throws IOException { + // for every round of n invocations, only the last call succeeds + if (counter.decrementAndGet() == 0) { + counter.set(round); + } else { + throw exception; + } + } + + @Override + public int read() throws IOException { + checkCounter(); + return delegate.read(); + } + + @Override + public int read(byte[] b) throws IOException { + checkCounter(); + return delegate.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkCounter(); + return delegate.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + return delegate.skip(n); + } + + @Override + public int available() throws IOException { + return delegate.available(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public synchronized void mark(int readlimit) { + delegate.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + delegate.reset(); + } + + @Override + public boolean markSupported() { + return delegate.markSupported(); + } + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java index 417ba16f71ad..0825935ed2ad 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java @@ -24,8 +24,10 @@ import java.util.Arrays; import java.util.Random; import org.apache.commons.io.IOUtils; +import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.MetricsContext; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -42,23 +44,27 @@ public class TestS3InputStream { @ClassRule public static final S3MockRule S3_MOCK_RULE = S3MockRule.builder().silent().build(); - private final S3Client s3 = S3_MOCK_RULE.createS3ClientV2(); + private final S3Client s3Client = S3_MOCK_RULE.createS3ClientV2(); private final Random random = new Random(1); @Before public void before() { - s3.createBucket(CreateBucketRequest.builder().bucket("bucket").build()); + s3Client.createBucket(CreateBucketRequest.builder().bucket("bucket").build()); } @Test public void testRead() throws Exception { + testRead(s3Client, new AwsProperties()); + } + + protected void testRead(S3Client s3, AwsProperties awsProperties) throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/read.dat"); int dataSize = 1024 * 1024 * 10; byte[] data = randomData(dataSize); writeS3Data(uri, data); - try (SeekableInputStream in = new S3InputStream(s3, uri)) { + try (SeekableInputStream in = new S3InputStream(s3, uri, awsProperties, MetricsContext.nullMetrics())) { int readSize = 1024; byte [] actual = new byte[readSize]; @@ -108,6 +114,10 @@ private void readAndCheck(SeekableInputStream in, long rangeStart, int size, byt @Test public void testRangeRead() throws Exception { + testRangeRead(s3Client, new AwsProperties()); + } + + protected void testRangeRead(S3Client s3, AwsProperties awsProperties) throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/range-read.dat"); int dataSize = 1024 * 1024 * 10; byte[] expected = randomData(dataSize); @@ -119,7 +129,7 @@ public void testRangeRead() throws Exception { writeS3Data(uri, expected); - try (RangeReadable in = new S3InputStream(s3, uri)) { + try (RangeReadable in = new S3InputStream(s3, uri, awsProperties, MetricsContext.nullMetrics())) { // first 1k position = 0; offset = 0; @@ -152,19 +162,23 @@ private void readAndCheckRanges( @Test public void testClose() throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/closed.dat"); - SeekableInputStream closed = new S3InputStream(s3, uri); + SeekableInputStream closed = new S3InputStream(s3Client, uri); closed.close(); assertThrows(IllegalStateException.class, () -> closed.seek(0)); } @Test public void testSeek() throws Exception { + testSeek(s3Client, new AwsProperties()); + } + + protected void testSeek(S3Client s3, AwsProperties awsProperties) throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/seek.dat"); byte[] expected = randomData(1024 * 1024); writeS3Data(uri, expected); - try (SeekableInputStream in = new S3InputStream(s3, uri)) { + try (SeekableInputStream in = new S3InputStream(s3, uri, awsProperties, MetricsContext.nullMetrics())) { in.seek(expected.length / 2); byte[] actual = IOUtils.readFully(in, expected.length / 2); assertArrayEquals(Arrays.copyOfRange(expected, expected.length / 2, expected.length), actual); @@ -178,7 +192,7 @@ private byte[] randomData(int size) { } private void writeS3Data(S3URI uri, byte[] data) throws IOException { - s3.putObject( + s3Client.putObject( PutObjectRequest.builder() .bucket(uri.bucket()) .key(uri.key()) @@ -186,4 +200,8 @@ private void writeS3Data(S3URI uri, byte[] data) throws IOException { .build(), RequestBody.fromBytes(data)); } + + protected S3Client s3Client() { + return s3Client; + } }