diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java index b3801d3f3621..57e54f8fd41f 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java @@ -385,6 +385,46 @@ public class S3FileIOProperties implements Serializable { */ private static final String S3_FILE_IO_USER_AGENT = "s3fileio/" + EnvironmentContext.get(); + /** Number of times to retry S3 read operation. */ + public static final String S3_READ_RETRY_NUM_RETRIES = "s3.read.retry.num-retries"; + + public static final int S3_READ_RETRY_NUM_RETRIES_DEFAULT = 7; + + /** Minimum wait time to retry a S3 read operation */ + public static final String S3_READ_RETRY_MIN_WAIT_MS = "s3.read.retry.min-wait-ms"; + + public static final long S3_READ_RETRY_MIN_WAIT_MS_DEFAULT = 500; // 0.5 seconds + + /** Maximum wait time to retry a S3 read operation */ + public static final String S3_READ_RETRY_MAX_WAIT_MS = "s3.read.retry.max-wait-ms"; + + public static final long S3_READ_RETRY_MAX_WAIT_MS_DEFAULT = 2 * 60 * 1000; // 2 minute + + /** Total retry time for a S3 read operation */ + public static final String S3_READ_RETRY_TOTAL_TIMEOUT_MS = "s3.read.retry.total-timeout-ms"; + + public static final long S3_READ_RETRY_TOTAL_TIMEOUT_MS_DEFAULT = 10 * 60 * 1000; // 10 minutes + + /** Number of times to retry S3 write operation. */ + public static final String S3_WRITE_RETRY_NUM_RETRIES = "s3.write.retry.num-retries"; + + public static final int S3_WRITE_RETRY_NUM_RETRIES_DEFAULT = 7; + + /** Minimum wait time to retry a S3 write operation */ + public static final String S3_WRITE_RETRY_MIN_WAIT_MS = "s3.write.retry.min-wait-ms"; + + public static final long S3_WRITE_RETRY_MIN_WAIT_MS_DEFAULT = 500; // 0.5 seconds + + /** Maximum wait time to retry a S3 write operation */ + public static final String S3_WRITE_RETRY_MAX_WAIT_MS = "s3.write.retry.max-wait-ms"; + + public static final long S3_WRITE_RETRY_MAX_WAIT_MS_DEFAULT = 2 * 60 * 1000; // 2 minute + + /** Total retry time for a S3 write operation */ + public static final String S3_WRITE_RETRY_TOTAL_TIMEOUT_MS = "s3.write.retry.total-timeout-ms"; + + public static final long S3_WRITE_RETRY_TOTAL_TIMEOUT_MS_DEFAULT = 10 * 60 * 1000; // 10 minutes + private String sseType; private String sseKey; private String sseMd5; @@ -408,6 +448,14 @@ public class S3FileIOProperties implements Serializable { private boolean isDeleteEnabled; private final Map bucketToAccessPointMapping; private boolean isPreloadClientEnabled; + private int s3ReadRetryNumRetries; + private long s3ReadRetryMinWaitMs; + private long s3ReadRetryMaxWaitMs; + private long s3ReadRetryTotalTimeoutMs; + private int s3WriteRetryNumRetries; + private long s3WriteRetryMinWaitMs; + private long s3WriteRetryMaxWaitMs; + private long s3WriteRetryTotalTimeoutMs; private boolean isDualStackEnabled; private boolean isPathStyleAccess; private boolean isUseArnRegionEnabled; @@ -440,6 +488,14 @@ public S3FileIOProperties() { this.isDeleteEnabled = DELETE_ENABLED_DEFAULT; this.bucketToAccessPointMapping = Collections.emptyMap(); this.isPreloadClientEnabled = PRELOAD_CLIENT_ENABLED_DEFAULT; + this.s3ReadRetryNumRetries = S3_READ_RETRY_NUM_RETRIES_DEFAULT; + this.s3ReadRetryMinWaitMs = S3_READ_RETRY_MIN_WAIT_MS_DEFAULT; + this.s3ReadRetryMaxWaitMs = S3_READ_RETRY_MAX_WAIT_MS_DEFAULT; + this.s3ReadRetryTotalTimeoutMs = S3_READ_RETRY_TOTAL_TIMEOUT_MS_DEFAULT; + this.s3WriteRetryNumRetries = S3_WRITE_RETRY_NUM_RETRIES_DEFAULT; + this.s3WriteRetryMinWaitMs = S3_WRITE_RETRY_MIN_WAIT_MS_DEFAULT; + this.s3WriteRetryMaxWaitMs = S3_WRITE_RETRY_MAX_WAIT_MS_DEFAULT; + this.s3WriteRetryTotalTimeoutMs = S3_WRITE_RETRY_TOTAL_TIMEOUT_MS_DEFAULT; this.isDualStackEnabled = DUALSTACK_ENABLED_DEFAULT; this.isPathStyleAccess = PATH_STYLE_ACCESS_DEFAULT; this.isUseArnRegionEnabled = USE_ARN_REGION_ENABLED_DEFAULT; @@ -532,6 +588,30 @@ public S3FileIOProperties(Map properties) { this.isPreloadClientEnabled = PropertyUtil.propertyAsBoolean( properties, PRELOAD_CLIENT_ENABLED, PRELOAD_CLIENT_ENABLED_DEFAULT); + this.s3ReadRetryNumRetries = + PropertyUtil.propertyAsInt( + properties, S3_READ_RETRY_NUM_RETRIES, S3_READ_RETRY_NUM_RETRIES_DEFAULT); + this.s3ReadRetryMinWaitMs = + PropertyUtil.propertyAsLong( + properties, S3_READ_RETRY_MIN_WAIT_MS, S3_READ_RETRY_MIN_WAIT_MS_DEFAULT); + this.s3ReadRetryMaxWaitMs = + PropertyUtil.propertyAsLong( + properties, S3_READ_RETRY_MAX_WAIT_MS, S3_READ_RETRY_MAX_WAIT_MS_DEFAULT); + this.s3ReadRetryTotalTimeoutMs = + PropertyUtil.propertyAsLong( + properties, S3_READ_RETRY_TOTAL_TIMEOUT_MS, S3_READ_RETRY_TOTAL_TIMEOUT_MS_DEFAULT); + this.s3WriteRetryNumRetries = + PropertyUtil.propertyAsInt( + properties, S3_WRITE_RETRY_NUM_RETRIES, S3_WRITE_RETRY_NUM_RETRIES_DEFAULT); + this.s3WriteRetryMinWaitMs = + PropertyUtil.propertyAsLong( + properties, S3_WRITE_RETRY_MIN_WAIT_MS, S3_WRITE_RETRY_MIN_WAIT_MS_DEFAULT); + this.s3WriteRetryMaxWaitMs = + PropertyUtil.propertyAsLong( + properties, S3_WRITE_RETRY_MAX_WAIT_MS, S3_WRITE_RETRY_MAX_WAIT_MS_DEFAULT); + this.s3WriteRetryTotalTimeoutMs = + PropertyUtil.propertyAsLong( + properties, S3_WRITE_RETRY_TOTAL_TIMEOUT_MS, S3_WRITE_RETRY_TOTAL_TIMEOUT_MS_DEFAULT); this.isRemoteSigningEnabled = PropertyUtil.propertyAsBoolean( properties, REMOTE_SIGNING_ENABLED, REMOTE_SIGNING_ENABLED_DEFAULT); @@ -723,6 +803,70 @@ public String writeStorageClass() { return writeStorageClass; } + public int s3ReadRetryNumRetries() { + return s3ReadRetryNumRetries; + } + + public void setS3ReadRetryNumRetries(int s3ReadRetryNumRetries) { + this.s3ReadRetryNumRetries = s3ReadRetryNumRetries; + } + + public long s3ReadRetryMinWaitMs() { + return s3ReadRetryMinWaitMs; + } + + public void setS3ReadRetryMinWaitMs(long s3ReadRetryMinWaitMs) { + this.s3ReadRetryMinWaitMs = s3ReadRetryMinWaitMs; + } + + public long s3ReadRetryMaxWaitMs() { + return s3ReadRetryMaxWaitMs; + } + + public void setS3ReadRetryMaxWaitMs(long s3ReadRetryMaxWaitMs) { + this.s3ReadRetryMaxWaitMs = s3ReadRetryMaxWaitMs; + } + + public long s3ReadRetryTotalTimeoutMs() { + return s3ReadRetryTotalTimeoutMs; + } + + public void setS3ReadRetryTotalTimeoutMs(long s3ReadRetryTotalTimeoutMs) { + this.s3ReadRetryTotalTimeoutMs = s3ReadRetryTotalTimeoutMs; + } + + public int s3WriteRetryNumRetries() { + return s3WriteRetryNumRetries; + } + + public void setS3WriteRetryNumRetries(int s3WriteRetryNumRetries) { + this.s3WriteRetryNumRetries = s3WriteRetryNumRetries; + } + + public long s3WriteRetryMinWaitMs() { + return s3WriteRetryMinWaitMs; + } + + public void setS3WriteRetryMinWaitMs(long s3WriteRetryMinWaitMs) { + this.s3WriteRetryMinWaitMs = s3WriteRetryMinWaitMs; + } + + public long s3WriteRetryMaxWaitMs() { + return s3WriteRetryMaxWaitMs; + } + + public void setS3WriteRetryMaxWaitMs(long s3WriteRetryMaxWaitMs) { + this.s3WriteRetryMaxWaitMs = s3WriteRetryMaxWaitMs; + } + + public long s3WriteRetryTotalTimeoutMs() { + return s3WriteRetryTotalTimeoutMs; + } + + public void setS3WriteRetryTotalTimeoutMs(long s3WriteRetryTotalTimeoutMs) { + this.s3WriteRetryTotalTimeoutMs = s3WriteRetryTotalTimeoutMs; + } + private Set toS3Tags(Map properties, String prefix) { return PropertyUtil.propertiesWithPrefix(properties, prefix).entrySet().stream() .map(e -> Tag.builder().key(e.getKey()).value(e.getValue()).build()) diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java index f1d6c30a27a5..35a560be927d 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.HttpURLConnection; import java.util.Arrays; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.io.FileIOMetricsContext; @@ -32,13 +34,17 @@ import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; +import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkServiceException; import software.amazon.awssdk.core.sync.ResponseTransformer; import software.amazon.awssdk.http.Abortable; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.S3Exception; class S3InputStream extends SeekableInputStream implements RangeReadable { private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class); @@ -90,23 +96,44 @@ public void seek(long newPos) { @Override public int read() throws IOException { - Preconditions.checkState(!closed, "Cannot read: already closed"); - positionStream(); + int[] byteRef = new int[1]; + retryAndThrow( + ignored -> { + try { + Preconditions.checkState(!closed, "Cannot read: already closed"); + positionStream(); + + byteRef[0] = stream.read(); + } catch (IOException e) { + closeStream(); + throw new UncheckedIOException(e); + } + }); pos += 1; next += 1; readBytes.increment(); readOperations.increment(); - return stream.read(); + return byteRef[0]; } @Override public int read(byte[] b, int off, int len) throws IOException { - Preconditions.checkState(!closed, "Cannot read: already closed"); - positionStream(); - - int bytesRead = stream.read(b, off, len); + int[] bytesReadRef = new int[1]; + retryAndThrow( + ignored -> { + try { + Preconditions.checkState(!closed, "Cannot read: already closed"); + positionStream(); + bytesReadRef[0] = stream.read(b, off, len); + } catch (IOException e) { + closeStream(); + throw new UncheckedIOException(e); + } + }); + + int bytesRead = bytesReadRef[0]; pos += bytesRead; next += bytesRead; readBytes.increment(bytesRead); @@ -121,7 +148,21 @@ public void readFully(long position, byte[] buffer, int offset, int length) thro String range = String.format("bytes=%s-%s", position, position + length - 1); - IOUtil.readFully(readRange(range), buffer, offset, length); + retryAndThrow( + ignored -> { + InputStream rangeStream = null; + try { + rangeStream = readRange(range); + IOUtil.readFully(rangeStream, buffer, offset, length); + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + if (rangeStream != null) { + abortStream(rangeStream); + rangeStream.close(); + } + } + }); } @Override @@ -130,7 +171,25 @@ public int readTail(byte[] buffer, int offset, int length) throws IOException { String range = String.format("bytes=-%s", length); - return IOUtil.readRemaining(readRange(range), buffer, offset, length); + int[] bytesReadRef = new int[1]; + + retryAndThrow( + ignored -> { + InputStream rangeStream = null; + try { + rangeStream = readRange(range); + bytesReadRef[0] = IOUtil.readRemaining(rangeStream, buffer, offset, length); + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + if (rangeStream != null) { + abortStream(rangeStream); + rangeStream.close(); + } + } + }); + + return bytesReadRef[0]; } private InputStream readRange(String range) { @@ -199,7 +258,7 @@ private void closeStream() throws IOException { if (stream != null) { // if we aren't at the end of the stream, and the stream is abortable, then // call abort() so we don't read the remaining data with the Apache HTTP client - abortStream(); + abortStream(stream); try { stream.close(); } catch (IOException e) { @@ -213,20 +272,70 @@ private void closeStream() throws IOException { } } - private void abortStream() { + private void abortStream(InputStream streamToAbort) { try { - if (stream instanceof Abortable && stream.read() != -1) { - ((Abortable) stream).abort(); + if (streamToAbort instanceof Abortable && streamToAbort.read() != -1) { + ((Abortable) streamToAbort).abort(); } } catch (Exception e) { LOG.warn("An error occurred while aborting the stream", e); } } + private void retryAndThrow(Tasks.Task task) throws IOException { + try { + Tasks.foreach(0) + .retry(s3FileIOProperties.s3ReadRetryNumRetries()) + .exponentialBackoff( + s3FileIOProperties.s3ReadRetryMinWaitMs(), + s3FileIOProperties.s3ReadRetryMaxWaitMs(), + s3FileIOProperties.s3ReadRetryTotalTimeoutMs(), + 2.0 /* exponential */) + .shouldRetryTest(S3InputStream::shouldRetry) + .throwFailureWhenFinished() + .run(task); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + } + public void setSkipSize(int skipSize) { this.skipSize = skipSize; } + public static boolean shouldRetry(Exception exception) { + if (exception instanceof NotFoundException) { + return false; + } + + if (exception instanceof AwsServiceException) { + switch (((AwsServiceException) exception).statusCode()) { + case HttpURLConnection.HTTP_FORBIDDEN: + case HttpURLConnection.HTTP_BAD_REQUEST: + return false; + } + } + + if (exception instanceof SdkServiceException) { + if (((SdkServiceException) exception).statusCode() == HttpURLConnection.HTTP_FORBIDDEN) { + return false; + } + } + + if (exception instanceof S3Exception) { + switch (((S3Exception) exception).statusCode()) { + case HttpURLConnection.HTTP_NOT_FOUND: + case 400: // range not satisfied + case 416: // range not satisfied + case 403: // range not satisfied + case 407: // range not satisfied + return false; + } + } + + return true; + } + @SuppressWarnings("checkstyle:NoFinalizer") @Override protected void finalize() throws Throwable { diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java index 046abdb61e13..b382a3fb1eb4 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java @@ -164,7 +164,7 @@ public void flush() throws IOException { public void write(int b) throws IOException { if (stream.getCount() >= multiPartSize) { newStream(); - uploadParts(); + retryAndThrow(ignored -> uploadParts()); } stream.write(b); @@ -174,8 +174,11 @@ public void write(int b) throws IOException { // switch to multipart upload if (multipartUploadId == null && pos >= multiPartThresholdSize) { - initializeMultiPartUpload(); - uploadParts(); + retryAndThrow( + ignored -> { + initializeMultiPartUpload(); + uploadParts(); + }); } } @@ -195,7 +198,7 @@ public void write(byte[] b, int off, int len) throws IOException { relativeOffset += writeSize; newStream(); - uploadParts(); + retryAndThrow(ignored -> uploadParts()); } stream.write(b, relativeOffset, remaining); @@ -205,14 +208,24 @@ public void write(byte[] b, int off, int len) throws IOException { // switch to multipart upload if (multipartUploadId == null && pos >= multiPartThresholdSize) { - initializeMultiPartUpload(); - uploadParts(); + retryAndThrow( + ignored -> { + initializeMultiPartUpload(); + uploadParts(); + }); } } private void newStream() throws IOException { if (stream != null) { - stream.close(); + retryAndThrow( + ignored -> { + try { + stream.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); } createStagingDirectoryIfNotExists(); @@ -262,7 +275,7 @@ public void close() throws IOException { try { stream.close(); - completeUploads(); + retryAndThrow(ignored -> completeUploads()); } finally { cleanUpStagingFiles(); } @@ -475,6 +488,23 @@ private void createStagingDirectoryIfNotExists() throws IOException, SecurityExc } } + private void retryAndThrow(Tasks.Task task) throws IOException { + try { + Tasks.foreach(0) + .retry(s3FileIOProperties.s3WriteRetryNumRetries()) + .exponentialBackoff( + s3FileIOProperties.s3WriteRetryMinWaitMs(), + s3FileIOProperties.s3WriteRetryMaxWaitMs(), + s3FileIOProperties.s3WriteRetryTotalTimeoutMs(), + 2.0 /* exponential */) + .shouldRetryTest(S3InputStream::shouldRetry) + .throwFailureWhenFinished() + .run(task); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + } + @SuppressWarnings("checkstyle:NoFinalizer") @Override protected void finalize() throws Throwable { diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestFuzzyS3InputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestFuzzyS3InputStream.java new file mode 100644 index 000000000000..3e9d46059c90 --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestFuzzyS3InputStream.java @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.net.SocketTimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.stream.Stream; +import javax.net.ssl.SSLException; +import org.apache.iceberg.AssertHelpers; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.BucketAlreadyExistsException; +import software.amazon.awssdk.services.s3.model.BucketAlreadyOwnedByYouException; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.CreateBucketResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.InvalidObjectStateException; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; + +public class TestFuzzyS3InputStream extends TestS3InputStream { + + @ParameterizedTest + @MethodSource("provideExceptionAndShouldRetry") + public void testReadWithFuzzyClientRetrySucceed(Exception exception, boolean shouldRetry) + throws Exception { + Assume.assumeTrue(shouldRetry); + S3FileIOProperties awsProperties = new S3FileIOProperties(); + awsProperties.setS3ReadRetryNumRetries(4); + awsProperties.setS3ReadRetryMinWaitMs(100); + testRead(fuzzyClient(new AtomicInteger(3), exception), awsProperties); + } + + @ParameterizedTest + @MethodSource("provideExceptionAndShouldRetry") + public void testReadWithFuzzyStreamRetrySucceed(Exception exception, boolean shouldRetry) + throws Exception { + Assume.assumeTrue(shouldRetry); + S3FileIOProperties awsProperties = new S3FileIOProperties(); + awsProperties.setS3ReadRetryNumRetries(4); + awsProperties.setS3ReadRetryMinWaitMs(100); + testRead(fuzzyStreamClient(new AtomicInteger(3), (IOException) exception), awsProperties); + } + + @ParameterizedTest + @MethodSource("provideExceptionAndShouldRetry") + public void testReadWithFuzzyClientRetryFail(Exception exception, boolean shouldRetry) + throws Exception { + Assume.assumeTrue(shouldRetry); + testRetryFail( + (counter, awsProperties) -> { + try { + testRead(fuzzyClient(counter, exception), awsProperties); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + exception, + shouldRetry); + } + + @ParameterizedTest + @MethodSource("provideExceptionAndShouldRetry") + public void testReadWithFuzzyStreamRetryFail(Exception exception, boolean shouldRetry) + throws Exception { + Assume.assumeTrue(shouldRetry); + testRetryFail( + (counter, awsProperties) -> { + try { + testRead(fuzzyStreamClient(counter, (IOException) exception), awsProperties); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + exception, + shouldRetry); + } + + @ParameterizedTest + @MethodSource("provideExceptionAndShouldRetry") + public void testSeekWithFuzzyClientRetrySucceed(Exception exception, boolean shouldRetry) + throws Exception { + Assume.assumeTrue(shouldRetry); + S3FileIOProperties awsProperties = new S3FileIOProperties(); + awsProperties.setS3ReadRetryNumRetries(4); + awsProperties.setS3ReadRetryMinWaitMs(100); + testSeek(fuzzyClient(new AtomicInteger(3), exception), awsProperties); + } + + @ParameterizedTest + @MethodSource("provideExceptionAndShouldRetry") + public void testSeekWithFuzzyStreamRetrySucceed(Exception exception, boolean shouldRetry) + throws Exception { + Assume.assumeTrue(shouldRetry); + S3FileIOProperties awsProperties = new S3FileIOProperties(); + awsProperties.setS3ReadRetryNumRetries(4); + awsProperties.setS3ReadRetryMinWaitMs(100); + testSeek(fuzzyStreamClient(new AtomicInteger(3), (IOException) exception), awsProperties); + } + + @ParameterizedTest + @MethodSource("provideExceptionAndShouldRetry") + public void testSeekWithFuzzyClientRetryFail(Exception exception, boolean shouldRetry) + throws Exception { + Assume.assumeTrue(shouldRetry); + testRetryFail( + (counter, awsProperties) -> { + try { + testSeek(fuzzyClient(counter, exception), awsProperties); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + exception, + shouldRetry); + } + + @ParameterizedTest + @MethodSource("provideExceptionAndShouldRetry") + public void testSeekWithFuzzyStreamRetryFail(Exception exception, boolean shouldRetry) + throws Exception { + Assume.assumeTrue(shouldRetry); + testRetryFail( + (counter, awsProperties) -> { + try { + testSeek(fuzzyStreamClient(counter, (IOException) exception), awsProperties); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + exception, + shouldRetry); + } + + @ParameterizedTest + @MethodSource("provideExceptionAndShouldRetry") + public void testRangeReadWithFuzzyClientRetrySucceed(Exception exception, boolean shouldRetry) + throws Exception { + Assume.assumeTrue(shouldRetry); + S3FileIOProperties awsProperties = new S3FileIOProperties(); + awsProperties.setS3ReadRetryNumRetries(4); + awsProperties.setS3ReadRetryMinWaitMs(100); + testRangeRead(fuzzyClient(new AtomicInteger(3), exception), awsProperties); + } + + @ParameterizedTest + @MethodSource("provideExceptionAndShouldRetry") + public void testRangeReadWithFuzzyStreamRetrySucceed(Exception exception, boolean shouldRetry) + throws Exception { + Assume.assumeTrue(shouldRetry); + S3FileIOProperties awsProperties = new S3FileIOProperties(); + awsProperties.setS3ReadRetryNumRetries(4); + awsProperties.setS3ReadRetryMinWaitMs(100); + testRangeRead(fuzzyStreamClient(new AtomicInteger(3), (IOException) exception), awsProperties); + } + + @ParameterizedTest + @MethodSource("provideExceptionAndShouldRetry") + public void testRangeReadWithFuzzyClientRetryFail(Exception exception, boolean shouldRetry) + throws Exception { + Assume.assumeTrue(shouldRetry); + testRetryFail( + (counter, awsProperties) -> { + try { + testRangeRead(fuzzyClient(counter, exception), awsProperties); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + exception, + shouldRetry); + } + + @ParameterizedTest + @MethodSource("provideExceptionAndShouldRetry") + public void testRangeReadWithFuzzyStreamRetryFail(Exception exception, boolean shouldRetry) + throws Exception { + Assume.assumeTrue(shouldRetry); + testRetryFail( + (counter, awsProperties) -> { + try { + testRangeRead(fuzzyStreamClient(counter, (IOException) exception), awsProperties); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + exception, + shouldRetry); + } + + @ParameterizedTest + @MethodSource("provideExceptionAndShouldRetry") + public void testExceptionNoRetry(Exception exception, boolean shouldRetry) throws Exception { + Assume.assumeFalse(shouldRetry); + testReadExceptionShouldNotRetry(exception); + } + + private static Stream provideExceptionAndShouldRetry() { + return Stream.of( + Arguments.of(new IOException("random failure"), true), + Arguments.of(new SSLException("client connection reset"), true), + Arguments.of(new SocketTimeoutException("client connection reset"), true), + Arguments.of(new EOFException("failure"), true), + Arguments.of( + AwsServiceException.builder().statusCode(403).message("failure").build(), false), + Arguments.of( + AwsServiceException.builder().statusCode(400).message("failure").build(), false), + Arguments.of(S3Exception.builder().statusCode(404).message("failure").build(), false), + Arguments.of(S3Exception.builder().statusCode(416).message("failure").build(), false)); + } + + private void testRetryFail( + BiConsumer runnable, + Exception exception, + boolean shouldRetry) + throws Exception { + Assume.assumeTrue(shouldRetry); + S3FileIOProperties awsProperties = new S3FileIOProperties(); + awsProperties.setS3ReadRetryNumRetries(2); + awsProperties.setS3ReadRetryMinWaitMs(100); + AtomicInteger counter = new AtomicInteger(4); + AssertHelpers.assertThrowsCause( + "Should fail after retries", + exception.getClass(), + exception.getMessage(), + () -> runnable.accept(counter, awsProperties)); + Assert.assertEquals( + "Should have 3 invocations (1 initial call, 2 retries)", 3, 4 - counter.get()); + } + + private void testReadExceptionShouldNotRetry(Exception exception) throws Exception { + S3FileIOProperties awsProperties = new S3FileIOProperties(); + awsProperties.setS3ReadRetryNumRetries(4); + awsProperties.setS3ReadRetryMinWaitMs(100); + AtomicInteger counter = new AtomicInteger(3); + AssertHelpers.assertThrowsCause( + "Should fail without retry", + exception.getClass(), + exception.getMessage(), + () -> { + try { + testRead(fuzzyClient(counter, exception), awsProperties); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + Assert.assertEquals( + "Should have 1 invocation (1 initial call, no retry)", 1, 3 - counter.get()); + } + + private S3Client fuzzyClient(AtomicInteger counter, Exception failure) { + S3Client fuzzyClient = spy(new S3ClientWrapper(s3Client())); + int round = counter.get(); + + // for every round of n invocations, only the last call succeeds + doAnswer( + invocation -> { + if (counter.decrementAndGet() == 0) { + counter.set(round); + return invocation.callRealMethod(); + } else { + throw failure; + } + }) + .when(fuzzyClient) + .getObject(any(GetObjectRequest.class), any(ResponseTransformer.class)); + return fuzzyClient; + } + + private S3Client fuzzyStreamClient(AtomicInteger counter, IOException failure) { + S3Client fuzzyClient = spy(new S3ClientWrapper(s3Client())); + doAnswer( + invocation -> + new FuzzyResponseInputStream(invocation.callRealMethod(), counter, failure)) + .when(fuzzyClient) + .getObject(any(GetObjectRequest.class), any(ResponseTransformer.class)); + return fuzzyClient; + } + + /** Wrapper for S3 client, used to mock the final class DefaultS3Client */ + public static class S3ClientWrapper implements S3Client { + + private final S3Client delegate; + + public S3ClientWrapper(S3Client delegate) { + this.delegate = delegate; + } + + @Override + public String serviceName() { + return delegate.serviceName(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public ReturnT getObject( + GetObjectRequest getObjectRequest, + ResponseTransformer responseTransformer) + throws NoSuchKeyException, InvalidObjectStateException, AwsServiceException, + SdkClientException, S3Exception { + return delegate.getObject(getObjectRequest, responseTransformer); + } + + @Override + public HeadObjectResponse headObject(HeadObjectRequest headObjectRequest) + throws NoSuchKeyException, AwsServiceException, SdkClientException, S3Exception { + return delegate.headObject(headObjectRequest); + } + + @Override + public PutObjectResponse putObject(PutObjectRequest putObjectRequest, RequestBody requestBody) + throws AwsServiceException, SdkClientException, S3Exception { + return delegate.putObject(putObjectRequest, requestBody); + } + + @Override + public CreateBucketResponse createBucket(CreateBucketRequest createBucketRequest) + throws BucketAlreadyExistsException, BucketAlreadyOwnedByYouException, AwsServiceException, + SdkClientException, S3Exception { + return delegate.createBucket(createBucketRequest); + } + } + + public static class FuzzyResponseInputStream extends InputStream { + + private final ResponseInputStream delegate; + private final AtomicInteger counter; + private final int round; + private final IOException exception; + + public FuzzyResponseInputStream( + Object invocationResponse, AtomicInteger counter, IOException exception) { + this.delegate = (ResponseInputStream) invocationResponse; + this.counter = counter; + this.round = counter.get(); + this.exception = exception; + } + + private void checkCounter() throws IOException { + // for every round of n invocations, only the last call succeeds + if (counter.decrementAndGet() == 0) { + counter.set(round); + } else { + throw exception; + } + } + + @Override + public int read() throws IOException { + checkCounter(); + return delegate.read(); + } + + @Override + public int read(byte[] b) throws IOException { + checkCounter(); + return delegate.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkCounter(); + return delegate.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + return delegate.skip(n); + } + + @Override + public int available() throws IOException { + return delegate.available(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public synchronized void mark(int readlimit) { + delegate.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + delegate.reset(); + } + + @Override + public boolean markSupported() { + return delegate.markSupported(); + } + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestFuzzyS3OutputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestFuzzyS3OutputStream.java new file mode 100644 index 000000000000..6ff6ff2c3883 --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestFuzzyS3OutputStream.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3; + +import static org.apache.iceberg.metrics.MetricsContext.nullMetrics; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.EOFException; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; +import javax.net.ssl.SSLException; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.exception.SdkServiceException; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; + +public class TestFuzzyS3OutputStream extends TestS3OutputStream { + + public TestFuzzyS3OutputStream() throws IOException {} + + @ParameterizedTest + @MethodSource("provideExceptionAndShouldRetry") + public void testWriteWithFuzzyClientRetrySucceed(Exception exception, boolean shouldRetry) + throws Exception { + Assume.assumeTrue(shouldRetry); + S3FileIOProperties awsProperties = new S3FileIOProperties(); + awsProperties.setS3WriteRetryNumRetries(2); + awsProperties.setS3WriteRetryMinWaitMs(100); + byte[] data = randomData(10 * 1024 * 1024); + S3Client s3 = mock(S3Client.class); + S3Client s3Client = fuzzyClient(s3, new AtomicInteger(3), exception); + testWrite(data, s3Client, randomURI(), awsProperties, nullMetrics()); + } + + @ParameterizedTest + @MethodSource("provideExceptionAndShouldRetry") + public void testWriteWithFuzzyClientRetryFailure(Exception exception, boolean shouldRetry) + throws IOException { + Assume.assumeTrue(shouldRetry); + S3FileIOProperties awsProperties = new S3FileIOProperties(); + awsProperties.setS3WriteRetryNumRetries(1); + awsProperties.setS3WriteRetryMinWaitMs(100); + byte[] data = randomData(10 * 1024 * 1024); + S3Client s3 = mock(S3Client.class); + S3Client s3Client = fuzzyClient(s3, new AtomicInteger(3), exception); + + try { + testWrite(data, s3Client, randomURI(), awsProperties, nullMetrics()); + } catch (Exception ex) { + Assert.assertEquals(ex.getCause().getClass(), exception.getClass()); + Assert.assertEquals(ex.getCause().getMessage(), exception.getMessage()); + } finally { + verify(s3Client, times(2)).putObject((PutObjectRequest) any(), (RequestBody) any()); + } + } + + @ParameterizedTest + @MethodSource("provideExceptionAndShouldRetry") + public void testWriteExceptionShouldNotRetry(Exception exception, boolean shouldRetry) + throws Exception { + Assume.assumeFalse(shouldRetry); + S3FileIOProperties awsProperties = new S3FileIOProperties(); + awsProperties.setS3WriteRetryNumRetries(4); + awsProperties.setS3WriteRetryMinWaitMs(100); + byte[] data = randomData(10 * 1024 * 1024); + S3Client s3 = mock(S3Client.class); + S3Client s3Client = fuzzyClient(s3, new AtomicInteger(3), exception); + + try { + testWrite(data, s3Client, randomURI(), awsProperties, nullMetrics()); + } catch (Exception ex) { + Throwable causeException = ex.getCause() == null ? ex : ex.getCause(); + Assert.assertEquals(causeException.getClass(), exception.getClass()); + Assert.assertEquals(causeException.getMessage(), exception.getMessage()); + } finally { + verify(s3Client, times(1)).putObject((PutObjectRequest) any(), (RequestBody) any()); + } + } + + private S3Client fuzzyClient(S3Client s3Client, AtomicInteger counter, Exception failure) { + S3Client fuzzyClient = spy(new TestFuzzyS3OutputStream.S3ClientWrapper(s3Client)); + int round = counter.get(); + // for every round of n invocations, only the last call succeeds + doAnswer( + invocation -> { + if (counter.decrementAndGet() == 0) { + counter.set(round); + return invocation.callRealMethod(); + } else { + throw failure; + } + }) + .when(fuzzyClient) + .uploadPart(any(UploadPartRequest.class), any(RequestBody.class)); + + doAnswer( + invocation -> { + if (counter.decrementAndGet() == 0) { + counter.set(round); + return invocation.callRealMethod(); + } else { + throw failure; + } + }) + .when(fuzzyClient) + .putObject(any(PutObjectRequest.class), any(RequestBody.class)); + + return fuzzyClient; + } + + private static Stream provideExceptionAndShouldRetry() { + return Stream.of( + Arguments.of(new IOException("random failure"), true), + Arguments.of(new SSLException("client connection reset"), true), + Arguments.of(new SocketTimeoutException("client connection reset"), true), + Arguments.of(new EOFException("failure"), true), + Arguments.of( + AwsServiceException.builder().statusCode(403).message("failure").build(), false), + Arguments.of( + AwsServiceException.builder().statusCode(400).message("failure").build(), false), + Arguments.of(S3Exception.builder().statusCode(404).message("failure").build(), false), + Arguments.of(S3Exception.builder().statusCode(416).message("failure").build(), false), + Arguments.of( + SdkServiceException.builder().statusCode(403).message("Access Denied").build(), false)); + } + + /** Wrapper for S3 client, used to mock the final class DefaultS3Client */ + public static class S3ClientWrapper implements S3Client { + + private final S3Client delegate; + + public S3ClientWrapper(S3Client delegate) { + this.delegate = delegate; + } + + @Override + public String serviceName() { + return delegate.serviceName(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public PutObjectResponse putObject(PutObjectRequest putObjectRequest, RequestBody requestBody) + throws AwsServiceException, SdkClientException, S3Exception { + return delegate.putObject(putObjectRequest, requestBody); + } + + @Override + public AbortMultipartUploadResponse abortMultipartUpload( + AbortMultipartUploadRequest abortMultipartUploadRequest) + throws AwsServiceException, SdkClientException { + return delegate.abortMultipartUpload(abortMultipartUploadRequest); + } + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java index feaac4eadad5..954d1fb24cc1 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java @@ -25,6 +25,7 @@ import org.apache.iceberg.io.IOUtil; import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.MetricsContext; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -51,13 +52,18 @@ public void before() { @Test public void testRead() throws Exception { + testRead(s3, new S3FileIOProperties()); + } + + protected void testRead(S3Client s3Client, S3FileIOProperties awsProperties) throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/read.dat"); int dataSize = 1024 * 1024 * 10; byte[] data = randomData(dataSize); writeS3Data(uri, data); - try (SeekableInputStream in = new S3InputStream(s3, uri)) { + try (SeekableInputStream in = + new S3InputStream(s3Client, uri, awsProperties, MetricsContext.nullMetrics())) { int readSize = 1024; byte[] actual = new byte[readSize]; @@ -109,6 +115,11 @@ private void readAndCheck( @Test public void testRangeRead() throws Exception { + testRangeRead(s3, new S3FileIOProperties()); + } + + protected void testRangeRead(S3Client s3Client, S3FileIOProperties awsProperties) + throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/range-read.dat"); int dataSize = 1024 * 1024 * 10; byte[] expected = randomData(dataSize); @@ -120,7 +131,8 @@ public void testRangeRead() throws Exception { writeS3Data(uri, expected); - try (RangeReadable in = new S3InputStream(s3, uri)) { + try (RangeReadable in = + new S3InputStream(s3Client, uri, awsProperties, MetricsContext.nullMetrics())) { // first 1k position = 0; offset = 0; @@ -161,12 +173,17 @@ public void testClose() throws Exception { @Test public void testSeek() throws Exception { + testSeek(s3, new S3FileIOProperties()); + } + + protected void testSeek(S3Client s3Client, S3FileIOProperties awsProperties) throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/seek.dat"); byte[] expected = randomData(1024 * 1024); writeS3Data(uri, expected); - try (SeekableInputStream in = new S3InputStream(s3, uri)) { + try (SeekableInputStream in = + new S3InputStream(s3Client, uri, awsProperties, MetricsContext.nullMetrics())) { in.seek(expected.length / 2); byte[] actual = new byte[expected.length / 2]; IOUtil.readFully(in, actual, 0, expected.length / 2); @@ -198,4 +215,8 @@ private void createBucket(String bucketName) { // don't do anything } } + + protected S3Client s3Client() { + return s3; + } } diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java index 6f8d1d6cb988..bd5119605ff6 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java @@ -21,6 +21,7 @@ import static org.apache.iceberg.metrics.MetricsContext.nullMetrics; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; @@ -42,6 +43,7 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.iceberg.metrics.MetricsContext; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; @@ -123,17 +125,13 @@ public void testAbortAfterFailedPartUpload() { RuntimeException mockException = new RuntimeException("mock uploadPart failure"); doThrow(mockException).when(s3mock).uploadPart((UploadPartRequest) any(), (RequestBody) any()); - Assertions.assertThatThrownBy( - () -> { - try (S3OutputStream stream = - new S3OutputStream(s3mock, randomURI(), properties, nullMetrics())) { - stream.write(randomData(10 * 1024 * 1024)); - } - }) - .isInstanceOf(mockException.getClass()) - .hasMessageContaining(mockException.getMessage()); - - verify(s3mock, times(1)).abortMultipartUpload((AbortMultipartUploadRequest) any()); + try { + testWrite(randomData(10 * 1024 * 1024), s3mock, randomURI(), properties, nullMetrics()); + } catch (Exception e) { + // swallow + } finally { + verify(s3mock, atLeastOnce()).abortMultipartUpload((AbortMultipartUploadRequest) any()); + } } @Test @@ -145,15 +143,17 @@ public void testAbortMultipart() { Assertions.assertThatThrownBy( () -> { - try (S3OutputStream stream = - new S3OutputStream(s3mock, randomURI(), properties, nullMetrics())) { - stream.write(randomData(10 * 1024 * 1024)); + try { + testWrite( + randomData(10 * 1024 * 1024), s3mock, randomURI(), properties, nullMetrics()); + } catch (Exception e) { + throw e; } }) .isInstanceOf(mockException.getClass()) .hasMessageContaining(mockException.getMessage()); - verify(s3mock, times(1)).abortMultipartUpload((AbortMultipartUploadRequest) any()); + verify(s3mock, atLeastOnce()).abortMultipartUpload((AbortMultipartUploadRequest) any()); } @Test @@ -325,21 +325,33 @@ private byte[] readS3Data(S3URI uri) { return data.asByteArray(); } - private byte[] randomData(int size) { + private void createBucket(String bucketName) { + try { + s3.createBucket(CreateBucketRequest.builder().bucket(bucketName).build()); + } catch (BucketAlreadyExistsException e) { + // do nothing + } + } + + protected byte[] randomData(int size) { byte[] result = new byte[size]; random.nextBytes(result); return result; } - private S3URI randomURI() { + protected S3URI randomURI() { return new S3URI(String.format("s3://%s/data/%s.dat", BUCKET, UUID.randomUUID())); } - private void createBucket(String bucketName) { - try { - s3.createBucket(CreateBucketRequest.builder().bucket(bucketName).build()); - } catch (BucketAlreadyExistsException e) { - // do nothing + protected void testWrite( + byte[] data, + S3Client s3Client, + S3URI s3Location, + S3FileIOProperties awsProperties, + MetricsContext metrics) + throws IOException { + try (S3OutputStream stream = new S3OutputStream(s3Client, s3Location, awsProperties, metrics)) { + stream.write(data); } } }