Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,46 @@ public class S3FileIOProperties implements Serializable {
*/
private static final String S3_FILE_IO_USER_AGENT = "s3fileio/" + EnvironmentContext.get();

/** Number of times to retry S3 read operation. */
public static final String S3_READ_RETRY_NUM_RETRIES = "s3.read.retry.num-retries";

public static final int S3_READ_RETRY_NUM_RETRIES_DEFAULT = 7;

/** Minimum wait time to retry a S3 read operation */
public static final String S3_READ_RETRY_MIN_WAIT_MS = "s3.read.retry.min-wait-ms";

public static final long S3_READ_RETRY_MIN_WAIT_MS_DEFAULT = 500; // 0.5 seconds

/** Maximum wait time to retry a S3 read operation */
public static final String S3_READ_RETRY_MAX_WAIT_MS = "s3.read.retry.max-wait-ms";

public static final long S3_READ_RETRY_MAX_WAIT_MS_DEFAULT = 2 * 60 * 1000; // 2 minute

/** Total retry time for a S3 read operation */
public static final String S3_READ_RETRY_TOTAL_TIMEOUT_MS = "s3.read.retry.total-timeout-ms";

public static final long S3_READ_RETRY_TOTAL_TIMEOUT_MS_DEFAULT = 10 * 60 * 1000; // 10 minutes

/** Number of times to retry S3 write operation. */
public static final String S3_WRITE_RETRY_NUM_RETRIES = "s3.write.retry.num-retries";

public static final int S3_WRITE_RETRY_NUM_RETRIES_DEFAULT = 7;

/** Minimum wait time to retry a S3 write operation */
public static final String S3_WRITE_RETRY_MIN_WAIT_MS = "s3.write.retry.min-wait-ms";

public static final long S3_WRITE_RETRY_MIN_WAIT_MS_DEFAULT = 500; // 0.5 seconds

/** Maximum wait time to retry a S3 write operation */
public static final String S3_WRITE_RETRY_MAX_WAIT_MS = "s3.write.retry.max-wait-ms";

public static final long S3_WRITE_RETRY_MAX_WAIT_MS_DEFAULT = 2 * 60 * 1000; // 2 minute

/** Total retry time for a S3 write operation */
public static final String S3_WRITE_RETRY_TOTAL_TIMEOUT_MS = "s3.write.retry.total-timeout-ms";

public static final long S3_WRITE_RETRY_TOTAL_TIMEOUT_MS_DEFAULT = 10 * 60 * 1000; // 10 minutes

private String sseType;
private String sseKey;
private String sseMd5;
Expand All @@ -408,6 +448,14 @@ public class S3FileIOProperties implements Serializable {
private boolean isDeleteEnabled;
private final Map<String, String> bucketToAccessPointMapping;
private boolean isPreloadClientEnabled;
private int s3ReadRetryNumRetries;
private long s3ReadRetryMinWaitMs;
private long s3ReadRetryMaxWaitMs;
private long s3ReadRetryTotalTimeoutMs;
private int s3WriteRetryNumRetries;
private long s3WriteRetryMinWaitMs;
private long s3WriteRetryMaxWaitMs;
private long s3WriteRetryTotalTimeoutMs;
private boolean isDualStackEnabled;
private boolean isPathStyleAccess;
private boolean isUseArnRegionEnabled;
Expand Down Expand Up @@ -440,6 +488,14 @@ public S3FileIOProperties() {
this.isDeleteEnabled = DELETE_ENABLED_DEFAULT;
this.bucketToAccessPointMapping = Collections.emptyMap();
this.isPreloadClientEnabled = PRELOAD_CLIENT_ENABLED_DEFAULT;
this.s3ReadRetryNumRetries = S3_READ_RETRY_NUM_RETRIES_DEFAULT;
this.s3ReadRetryMinWaitMs = S3_READ_RETRY_MIN_WAIT_MS_DEFAULT;
this.s3ReadRetryMaxWaitMs = S3_READ_RETRY_MAX_WAIT_MS_DEFAULT;
this.s3ReadRetryTotalTimeoutMs = S3_READ_RETRY_TOTAL_TIMEOUT_MS_DEFAULT;
this.s3WriteRetryNumRetries = S3_WRITE_RETRY_NUM_RETRIES_DEFAULT;
this.s3WriteRetryMinWaitMs = S3_WRITE_RETRY_MIN_WAIT_MS_DEFAULT;
this.s3WriteRetryMaxWaitMs = S3_WRITE_RETRY_MAX_WAIT_MS_DEFAULT;
this.s3WriteRetryTotalTimeoutMs = S3_WRITE_RETRY_TOTAL_TIMEOUT_MS_DEFAULT;
this.isDualStackEnabled = DUALSTACK_ENABLED_DEFAULT;
this.isPathStyleAccess = PATH_STYLE_ACCESS_DEFAULT;
this.isUseArnRegionEnabled = USE_ARN_REGION_ENABLED_DEFAULT;
Expand Down Expand Up @@ -532,6 +588,30 @@ public S3FileIOProperties(Map<String, String> properties) {
this.isPreloadClientEnabled =
PropertyUtil.propertyAsBoolean(
properties, PRELOAD_CLIENT_ENABLED, PRELOAD_CLIENT_ENABLED_DEFAULT);
this.s3ReadRetryNumRetries =
PropertyUtil.propertyAsInt(
properties, S3_READ_RETRY_NUM_RETRIES, S3_READ_RETRY_NUM_RETRIES_DEFAULT);
this.s3ReadRetryMinWaitMs =
PropertyUtil.propertyAsLong(
properties, S3_READ_RETRY_MIN_WAIT_MS, S3_READ_RETRY_MIN_WAIT_MS_DEFAULT);
this.s3ReadRetryMaxWaitMs =
PropertyUtil.propertyAsLong(
properties, S3_READ_RETRY_MAX_WAIT_MS, S3_READ_RETRY_MAX_WAIT_MS_DEFAULT);
this.s3ReadRetryTotalTimeoutMs =
PropertyUtil.propertyAsLong(
properties, S3_READ_RETRY_TOTAL_TIMEOUT_MS, S3_READ_RETRY_TOTAL_TIMEOUT_MS_DEFAULT);
this.s3WriteRetryNumRetries =
PropertyUtil.propertyAsInt(
properties, S3_WRITE_RETRY_NUM_RETRIES, S3_WRITE_RETRY_NUM_RETRIES_DEFAULT);
this.s3WriteRetryMinWaitMs =
PropertyUtil.propertyAsLong(
properties, S3_WRITE_RETRY_MIN_WAIT_MS, S3_WRITE_RETRY_MIN_WAIT_MS_DEFAULT);
this.s3WriteRetryMaxWaitMs =
PropertyUtil.propertyAsLong(
properties, S3_WRITE_RETRY_MAX_WAIT_MS, S3_WRITE_RETRY_MAX_WAIT_MS_DEFAULT);
this.s3WriteRetryTotalTimeoutMs =
PropertyUtil.propertyAsLong(
properties, S3_WRITE_RETRY_TOTAL_TIMEOUT_MS, S3_WRITE_RETRY_TOTAL_TIMEOUT_MS_DEFAULT);
this.isRemoteSigningEnabled =
PropertyUtil.propertyAsBoolean(
properties, REMOTE_SIGNING_ENABLED, REMOTE_SIGNING_ENABLED_DEFAULT);
Expand Down Expand Up @@ -723,6 +803,70 @@ public String writeStorageClass() {
return writeStorageClass;
}

public int s3ReadRetryNumRetries() {
return s3ReadRetryNumRetries;
}

public void setS3ReadRetryNumRetries(int s3ReadRetryNumRetries) {
this.s3ReadRetryNumRetries = s3ReadRetryNumRetries;
}

public long s3ReadRetryMinWaitMs() {
return s3ReadRetryMinWaitMs;
}

public void setS3ReadRetryMinWaitMs(long s3ReadRetryMinWaitMs) {
this.s3ReadRetryMinWaitMs = s3ReadRetryMinWaitMs;
}

public long s3ReadRetryMaxWaitMs() {
return s3ReadRetryMaxWaitMs;
}

public void setS3ReadRetryMaxWaitMs(long s3ReadRetryMaxWaitMs) {
this.s3ReadRetryMaxWaitMs = s3ReadRetryMaxWaitMs;
}

public long s3ReadRetryTotalTimeoutMs() {
return s3ReadRetryTotalTimeoutMs;
}

public void setS3ReadRetryTotalTimeoutMs(long s3ReadRetryTotalTimeoutMs) {
this.s3ReadRetryTotalTimeoutMs = s3ReadRetryTotalTimeoutMs;
}

public int s3WriteRetryNumRetries() {
return s3WriteRetryNumRetries;
}

public void setS3WriteRetryNumRetries(int s3WriteRetryNumRetries) {
this.s3WriteRetryNumRetries = s3WriteRetryNumRetries;
}

public long s3WriteRetryMinWaitMs() {
return s3WriteRetryMinWaitMs;
}

public void setS3WriteRetryMinWaitMs(long s3WriteRetryMinWaitMs) {
this.s3WriteRetryMinWaitMs = s3WriteRetryMinWaitMs;
}

public long s3WriteRetryMaxWaitMs() {
return s3WriteRetryMaxWaitMs;
}

public void setS3WriteRetryMaxWaitMs(long s3WriteRetryMaxWaitMs) {
this.s3WriteRetryMaxWaitMs = s3WriteRetryMaxWaitMs;
}

public long s3WriteRetryTotalTimeoutMs() {
return s3WriteRetryTotalTimeoutMs;
}

public void setS3WriteRetryTotalTimeoutMs(long s3WriteRetryTotalTimeoutMs) {
this.s3WriteRetryTotalTimeoutMs = s3WriteRetryTotalTimeoutMs;
}

private Set<Tag> toS3Tags(Map<String, String> properties, String prefix) {
return PropertyUtil.propertiesWithPrefix(properties, prefix).entrySet().stream()
.map(e -> Tag.builder().key(e.getKey()).value(e.getValue()).build())
Expand Down
135 changes: 122 additions & 13 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.HttpURLConnection;
import java.util.Arrays;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.FileIOMetricsContext;
Expand All @@ -32,13 +34,17 @@
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.http.Abortable;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.S3Exception;

class S3InputStream extends SeekableInputStream implements RangeReadable {
private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class);
Expand Down Expand Up @@ -90,23 +96,44 @@ public void seek(long newPos) {

@Override
public int read() throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();
int[] byteRef = new int[1];
retryAndThrow(
ignored -> {
try {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

byteRef[0] = stream.read();
} catch (IOException e) {
closeStream();
throw new UncheckedIOException(e);
}
});

pos += 1;
next += 1;
readBytes.increment();
readOperations.increment();

return stream.read();
return byteRef[0];
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

int bytesRead = stream.read(b, off, len);
int[] bytesReadRef = new int[1];
retryAndThrow(
ignored -> {
try {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();
bytesReadRef[0] = stream.read(b, off, len);
} catch (IOException e) {
closeStream();
throw new UncheckedIOException(e);
}
});

int bytesRead = bytesReadRef[0];
pos += bytesRead;
next += bytesRead;
readBytes.increment(bytesRead);
Expand All @@ -121,7 +148,21 @@ public void readFully(long position, byte[] buffer, int offset, int length) thro

String range = String.format("bytes=%s-%s", position, position + length - 1);

IOUtil.readFully(readRange(range), buffer, offset, length);
retryAndThrow(
ignored -> {
InputStream rangeStream = null;
try {
rangeStream = readRange(range);
IOUtil.readFully(rangeStream, buffer, offset, length);
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
if (rangeStream != null) {
abortStream(rangeStream);
rangeStream.close();
}
}
});
}

@Override
Expand All @@ -130,7 +171,25 @@ public int readTail(byte[] buffer, int offset, int length) throws IOException {

String range = String.format("bytes=-%s", length);

return IOUtil.readRemaining(readRange(range), buffer, offset, length);
int[] bytesReadRef = new int[1];

retryAndThrow(
ignored -> {
InputStream rangeStream = null;
try {
rangeStream = readRange(range);
bytesReadRef[0] = IOUtil.readRemaining(rangeStream, buffer, offset, length);
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
if (rangeStream != null) {
abortStream(rangeStream);
rangeStream.close();
}
}
});

return bytesReadRef[0];
}

private InputStream readRange(String range) {
Expand Down Expand Up @@ -199,7 +258,7 @@ private void closeStream() throws IOException {
if (stream != null) {
// if we aren't at the end of the stream, and the stream is abortable, then
// call abort() so we don't read the remaining data with the Apache HTTP client
abortStream();
abortStream(stream);
try {
stream.close();
} catch (IOException e) {
Expand All @@ -213,20 +272,70 @@ private void closeStream() throws IOException {
}
}

private void abortStream() {
private void abortStream(InputStream streamToAbort) {
try {
if (stream instanceof Abortable && stream.read() != -1) {
((Abortable) stream).abort();
if (streamToAbort instanceof Abortable && streamToAbort.read() != -1) {
((Abortable) streamToAbort).abort();
}
} catch (Exception e) {
LOG.warn("An error occurred while aborting the stream", e);
}
}

private void retryAndThrow(Tasks.Task task) throws IOException {
try {
Tasks.foreach(0)
.retry(s3FileIOProperties.s3ReadRetryNumRetries())
.exponentialBackoff(
s3FileIOProperties.s3ReadRetryMinWaitMs(),
s3FileIOProperties.s3ReadRetryMaxWaitMs(),
s3FileIOProperties.s3ReadRetryTotalTimeoutMs(),
2.0 /* exponential */)
.shouldRetryTest(S3InputStream::shouldRetry)
.throwFailureWhenFinished()
.run(task);
} catch (UncheckedIOException e) {
throw e.getCause();
}
}

public void setSkipSize(int skipSize) {
this.skipSize = skipSize;
}

public static boolean shouldRetry(Exception exception) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (exception instanceof NotFoundException) {
return false;
}

if (exception instanceof AwsServiceException) {
switch (((AwsServiceException) exception).statusCode()) {
case HttpURLConnection.HTTP_FORBIDDEN:
case HttpURLConnection.HTTP_BAD_REQUEST:
return false;
}
}

if (exception instanceof SdkServiceException) {
if (((SdkServiceException) exception).statusCode() == HttpURLConnection.HTTP_FORBIDDEN) {
return false;
}
}

if (exception instanceof S3Exception) {
switch (((S3Exception) exception).statusCode()) {
case HttpURLConnection.HTTP_NOT_FOUND:
case 400: // range not satisfied
case 416: // range not satisfied
case 403: // range not satisfied
case 407: // range not satisfied
return false;
}
}

return true;
}

@SuppressWarnings("checkstyle:NoFinalizer")
@Override
protected void finalize() throws Throwable {
Expand Down
Loading