Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,30 @@ public class AwsProperties implements Serializable {
@Deprecated
public static final boolean CLIENT_ENABLE_ETAG_CHECK_DEFAULT = false;

/**
* Number of times to retry S3 read operation.
*/
public static final String S3_READ_RETRY_NUM_RETRIES = "s3.read.retry.num-retries";
public static final int S3_READ_RETRY_NUM_RETRIES_DEFAULT = 7;

/**
* Minimum wait time to retry a S3 read operation
*/
public static final String S3_READ_RETRY_MIN_WAIT_MS = "s3.read.retry.min-wait-ms";
public static final long S3_READ_RETRY_MIN_WAIT_MS_DEFAULT = 500; // 0.5 seconds

/**
* Maximum wait time to retry a S3 read operation
*/
public static final String S3_READ_RETRY_MAX_WAIT_MS = "s3.read.retry.max-wait-ms";
public static final long S3_READ_RETRY_MAX_WAIT_MS_DEFAULT = 2 * 60 * 1000; // 2 minute

/**
* Total retry time for a S3 read operation
*/
public static final String S3_READ_RETRY_TOTAL_TIMEOUT_MS = "s3.read.retry.total-timeout-ms";
public static final long S3_READ_RETRY_TOTAL_TIMEOUT_MS_DEFAULT = 10 * 60 * 1000; // 10 minutes

/**
* Used by {@link LakeFormationAwsClientFactory}.
* The table name used as part of lake formation credentials request.
Expand Down Expand Up @@ -380,6 +404,10 @@ public class AwsProperties implements Serializable {
private int s3FileIoDeleteThreads;
private boolean isS3DeleteEnabled;
private final Map<String, String> s3BucketToAccessPointMapping;
private int s3ReadRetryNumRetries;
private long s3ReadRetryMinWaitMs;
private long s3ReadRetryMaxWaitMs;
private long s3ReadRetryTotalTimeoutMs;

private String glueCatalogId;
private boolean glueCatalogSkipArchive;
Expand All @@ -404,6 +432,10 @@ public AwsProperties() {
this.s3FileIoDeleteThreads = Runtime.getRuntime().availableProcessors();
this.isS3DeleteEnabled = S3_DELETE_ENABLED_DEFAULT;
this.s3BucketToAccessPointMapping = ImmutableMap.of();
this.s3ReadRetryNumRetries = S3_READ_RETRY_NUM_RETRIES_DEFAULT;
this.s3ReadRetryMinWaitMs = S3_READ_RETRY_MIN_WAIT_MS_DEFAULT;
this.s3ReadRetryMaxWaitMs = S3_READ_RETRY_MAX_WAIT_MS_DEFAULT;
this.s3ReadRetryTotalTimeoutMs = S3_READ_RETRY_TOTAL_TIMEOUT_MS_DEFAULT;

this.glueCatalogId = null;
this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT;
Expand Down Expand Up @@ -472,6 +504,14 @@ public AwsProperties(Map<String, String> properties) {
Runtime.getRuntime().availableProcessors());
this.isS3DeleteEnabled = PropertyUtil.propertyAsBoolean(properties, S3_DELETE_ENABLED, S3_DELETE_ENABLED_DEFAULT);
this.s3BucketToAccessPointMapping = PropertyUtil.propertiesWithPrefix(properties, S3_ACCESS_POINTS_PREFIX);
this.s3ReadRetryNumRetries = PropertyUtil.propertyAsInt(properties, S3_READ_RETRY_NUM_RETRIES,
S3_READ_RETRY_NUM_RETRIES_DEFAULT);
this.s3ReadRetryMinWaitMs = PropertyUtil.propertyAsLong(properties, S3_READ_RETRY_MIN_WAIT_MS,
S3_READ_RETRY_MIN_WAIT_MS_DEFAULT);
this.s3ReadRetryMaxWaitMs = PropertyUtil.propertyAsLong(properties, S3_READ_RETRY_MAX_WAIT_MS,
S3_READ_RETRY_MAX_WAIT_MS_DEFAULT);
this.s3ReadRetryTotalTimeoutMs = PropertyUtil.propertyAsLong(properties, S3_READ_RETRY_TOTAL_TIMEOUT_MS,
S3_READ_RETRY_TOTAL_TIMEOUT_MS_DEFAULT);

this.dynamoDbTableName = PropertyUtil.propertyAsString(properties, DYNAMODB_TABLE_NAME,
DYNAMODB_TABLE_NAME_DEFAULT);
Expand Down Expand Up @@ -613,6 +653,38 @@ public void setS3DeleteEnabled(boolean s3DeleteEnabled) {
this.isS3DeleteEnabled = s3DeleteEnabled;
}

public int s3ReadRetryNumRetries() {
return s3ReadRetryNumRetries;
}

public void setS3ReadRetryNumRetries(int s3ReadRetryNumRetries) {
this.s3ReadRetryNumRetries = s3ReadRetryNumRetries;
}

public long s3ReadRetryMinWaitMs() {
return s3ReadRetryMinWaitMs;
}

public void setS3ReadRetryMinWaitMs(long s3ReadRetryMinWaitMs) {
this.s3ReadRetryMinWaitMs = s3ReadRetryMinWaitMs;
}

public long s3ReadRetryMaxWaitMs() {
return s3ReadRetryMaxWaitMs;
}

public void setS3ReadRetryMaxWaitMs(long s3ReadRetryMaxWaitMs) {
this.s3ReadRetryMaxWaitMs = s3ReadRetryMaxWaitMs;
}

public long s3ReadRetryTotalTimeoutMs() {
return s3ReadRetryTotalTimeoutMs;
}

public void setS3ReadRetryTotalTimeoutMs(long s3ReadRetryTotalTimeoutMs) {
this.s3ReadRetryTotalTimeoutMs = s3ReadRetryTotalTimeoutMs;
}

private Set<Tag> toTags(Map<String, String> properties, String prefix) {
return PropertyUtil.propertiesWithPrefix(properties, prefix)
.entrySet().stream()
Expand Down
181 changes: 157 additions & 24 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

package org.apache.iceberg.aws.s3;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.HttpURLConnection;
import java.util.Arrays;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.io.FileIOMetricsContext;
Expand All @@ -33,11 +36,16 @@
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.AbortedException;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.http.Abortable;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;

class S3InputStream extends SeekableInputStream implements RangeReadable {
private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class);
Expand Down Expand Up @@ -88,23 +96,69 @@ public void seek(long newPos) {

@Override
public int read() throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();
int[] byteRef = new int[1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is less than ideal but Tasks is a little limited in this area because it's really based on threaded execution and we're reusing it here for retry. It might be worth exploring whether we can tweak Tasks to support this usecase:

Ideally we'd have something like:

int read = Tasks.single()
    .exponentialBackoff(...)
    ...
    run(<funciton>);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also consider bringing in Failsafe, which is more closely aligned with what we want for these cases. Though it is a new dependency (but zero deps and apache licensed).

try {
Tasks.foreach(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we don't pass in the stream in the foreach? I guess it's a private field so it's a bit awkward to pass it in as an argument but seems more readable than Tasks.foreach(0) imo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we cannot, because the input stream needs to be closed and re-opened, but if we put it here then the retry will always retry against the same stream that is already closed after the first failure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lot of duplicate code for the retry logic. Could we consolidate to a method that wraps just the logic we want to execute?

For example:

retry(() -> {
    Preconditions.checkState(!closed, "Cannot read: already closed");
    positionStream();

    byteRef[0] =  stream.read();
})

(you many have to use consumer/function for the range versions, but seems like there might be an more concise way to write this)

.retry(awsProperties.s3ReadRetryNumRetries())
.exponentialBackoff(
awsProperties.s3ReadRetryMinWaitMs(),
awsProperties.s3ReadRetryMaxWaitMs(),
awsProperties.s3ReadRetryTotalTimeoutMs(),
2.0 /* exponential */)
.shouldRetryTest(S3InputStream::shouldRetry)
.throwFailureWhenFinished()
.run(ignored -> {
try {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

byteRef[0] = stream.read();
} catch (IOException e) {
closeStream();
throw new UncheckedIOException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on an unrecoverable network error, abortStream() is required to avoid the failed TLS connection being recycled. This adds a new challenge: identify an unrecoverable network error from the deep nested stacks which can come from the SDK, and distinguish from other failures

}
});
} catch (UncheckedIOException e) {
throw e.getCause();
}

pos += 1;
next += 1;
readBytes.increment();
readOperations.increment();

return stream.read();
return byteRef[0];
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();
int[] bytesReadRef = new int[1];
try {
Tasks.foreach(0)
.retry(awsProperties.s3ReadRetryNumRetries())
.exponentialBackoff(
awsProperties.s3ReadRetryMinWaitMs(),
awsProperties.s3ReadRetryMaxWaitMs(),
awsProperties.s3ReadRetryTotalTimeoutMs(),
2.0 /* exponential */)
.shouldRetryTest(S3InputStream::shouldRetry)
.throwFailureWhenFinished()
.run(ignored -> {
try {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

bytesReadRef[0] = stream.read(b, off, len);
} catch (IOException e) {
closeStream();
throw new UncheckedIOException(e);
}
});
} catch (UncheckedIOException e) {
throw e.getCause();
}

int bytesRead = stream.read(b, off, len);
int bytesRead = bytesReadRef[0];
pos += bytesRead;
next += bytesRead;
readBytes.increment((long) bytesRead);
Expand All @@ -118,17 +172,65 @@ public void readFully(long position, byte[] buffer, int offset, int length) thro
Preconditions.checkPositionIndexes(offset, offset + length, buffer.length);

String range = String.format("bytes=%s-%s", position, position + length - 1);

IOUtil.readFully(readRange(range), buffer, offset, length);
try {
Tasks.foreach(0)
.retry(awsProperties.s3ReadRetryNumRetries())
.exponentialBackoff(
awsProperties.s3ReadRetryMinWaitMs(),
awsProperties.s3ReadRetryMaxWaitMs(),
awsProperties.s3ReadRetryTotalTimeoutMs(),
2.0 /* exponential */)
.shouldRetryTest(S3InputStream::shouldRetry)
.throwFailureWhenFinished()
.run(ignored -> {
InputStream rangeStream = null;
try {
rangeStream = readRange(range);
IOUtil.readFully(rangeStream, buffer, offset, length);
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
closeServerSideStream(rangeStream);
}
});
} catch (UncheckedIOException e) {
throw e.getCause();
}
}

@Override
public int readTail(byte[] buffer, int offset, int length) throws IOException {
Preconditions.checkPositionIndexes(offset, offset + length, buffer.length);

String range = String.format("bytes=-%s", length);
int[] bytesReadRef = new int[1];

try {
Tasks.foreach(0)
.retry(awsProperties.s3ReadRetryNumRetries())
.exponentialBackoff(
awsProperties.s3ReadRetryMinWaitMs(),
awsProperties.s3ReadRetryMaxWaitMs(),
awsProperties.s3ReadRetryTotalTimeoutMs(),
2.0 /* exponential */)
.shouldRetryTest(S3InputStream::shouldRetry)
.throwFailureWhenFinished()
.run(ignored -> {
InputStream rangeStream = null;
try {
rangeStream = readRange(range);
bytesReadRef[0] = IOUtil.readRemaining(rangeStream, buffer, offset, length);
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
closeServerSideStream(rangeStream);
}
});
} catch (UncheckedIOException e) {
throw e.getCause();
}

return IOUtil.readRemaining(readRange(range), buffer, offset, length);
return bytesReadRef[0];
}

private InputStream readRange(String range) {
Expand Down Expand Up @@ -172,31 +274,62 @@ private void positionStream() throws IOException {
}

// close the stream and open at desired position
LOG.debug("Seek with new stream for {} to offset {}", location, next);
LOG.warn("Seek with new stream for {} to offset {}", location, next);
pos = next;
openStream();
}

private void openStream() throws IOException {
GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
.bucket(location.bucket())
.key(location.key())
.range(String.format("bytes=%s-", pos));

S3RequestUtil.configureEncryption(awsProperties, requestBuilder);

private void openStream() {
closeStream();
stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());
stream = readRange(String.format("bytes=%s-", pos));
}

private void closeStream() {
closeServerSideStream(stream);
stream = null;
}

private void closeStream() throws IOException {
if (stream != null) {
stream.close();
private static void closeServerSideStream(InputStream streamToClose) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just put this in the close stream (not have a separate method for it).

if (streamToClose != null) {
try {
if (streamToClose instanceof Abortable) {
// Stated in the ResponseInputStream javadoc:
// If it is not desired to read remaining data from the stream,
// you can explicitly abort the connection via abort().
((Abortable) streamToClose).abort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abort() removes it from the HTTP pool.

  • this is good if you are closing with an unrecoverable network error
  • this is absolutely what you do not want to do on a normal read.

} else {
streamToClose.close();
}
} catch (IOException | AbortedException e) {
// ignore failure to abort or close stream
}
}
}

public void setSkipSize(int skipSize) {
this.skipSize = skipSize;
private static boolean shouldRetry(Exception exception) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to be more explicit about the cases where we do want to retry rather than just defaulting to retry. I think there were specific cases that were mentined as known issues (e.g. socket timeout). However, the problem we've had in other retry scenarios is that the retry cases are overly broad and retry when they really shouldn't.

I think the default here should return false and only return true for the known Exceptions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reasonably confident its a lot more complicated than this, especially as SDK-level failures often create deep chains with the underlying cause at the bottom

if (exception instanceof UncheckedIOException) {
if (exception.getCause() instanceof EOFException) {
return false;
}
}

if (exception instanceof AwsServiceException) {
switch (((AwsServiceException) exception).statusCode()) {
case HttpURLConnection.HTTP_FORBIDDEN:
case HttpURLConnection.HTTP_BAD_REQUEST:
return false;
}
}

if (exception instanceof S3Exception) {
switch (((S3Exception) exception).statusCode()) {
case HttpURLConnection.HTTP_NOT_FOUND:
case 416: // range not satisfied
return false;
}
}

return true;
Comment on lines +309 to +332
Copy link
Contributor

@singhpk234 singhpk234 May 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this in a seperate util class ? Considering it can be extended to all s3 interactions

Also any pointers if we know this is the complete list considering API's we use to connect to S3, For ex : (sample list : S3A)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss this in the thread above #4912 (comment)

Comment on lines +310 to +332
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious is this the same retry policy that Hadoop S3A has?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not. It's closer to the ones in Presto and Trino. Basically it retires almost all IO exceptions except for EOF, because they are most likely network issues. For AWS side exceptions, this logic seems sufficient to me if they are proven sufficient in Presto and Trino. I am not sure if we need to list every single possible exception class like S3A did.

}

@SuppressWarnings("checkstyle:NoFinalizer")
Expand Down
Loading