Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import javax.net.ssl.SSLException;
import org.apache.http.ConnectionClosedException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -52,6 +53,7 @@
public class TestFlakyS3InputStream extends TestS3InputStream {

private AtomicInteger resetForRetryCounter;
private static final long CONTENT_LENGTH = 1024 * 1024; // An arbitrary content length.

@BeforeEach
public void setupTest() {
Expand All @@ -60,7 +62,7 @@ public void setupTest() {

@Override
S3InputStream newInputStream(S3Client s3Client, S3URI uri) {
return new S3InputStream(s3Client, uri) {
return new S3InputStream(s3Client, uri, CONTENT_LENGTH) {
@Override
void resetForRetry() throws IOException {
resetForRetryCounter.incrementAndGet();
Expand Down Expand Up @@ -122,6 +124,7 @@ public void testSeekWithFlakyStreamNonRetryableException(IOException exception)
private static Stream<Arguments> retryableExceptions() {
return Stream.of(
Arguments.of(
new ConnectionClosedException("connection closed exception"),
new SocketTimeoutException("socket timeout exception"),
new SSLException("some ssl exception")));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class TestS3InputStream {
private final S3Client s3 = MinioUtil.createS3Client(MINIO);
private final Random random = new Random(1);

private static final int CONTENT_LENGTH = 1024 * 1024 * 10; // 10MB

@BeforeEach
public void before() {
createBucket("bucket");
Expand All @@ -57,13 +59,13 @@ public void testRead() throws Exception {
}

S3InputStream newInputStream(S3Client s3Client, S3URI uri) {
return new S3InputStream(s3Client, uri);
return new S3InputStream(s3Client, uri, CONTENT_LENGTH);
}

protected void testRead(S3Client s3Client) throws Exception {
S3URI uri = new S3URI("s3://bucket/path/to/read.dat");
int dataSize = 1024 * 1024 * 10;
byte[] data = randomData(dataSize);

byte[] data = randomData(CONTENT_LENGTH);

writeS3Data(uri, data);

Expand Down Expand Up @@ -121,9 +123,9 @@ public void testRangeRead() throws Exception {

protected void testRangeRead(S3Client s3Client) throws Exception {
S3URI uri = new S3URI("s3://bucket/path/to/range-read.dat");
int dataSize = 1024 * 1024 * 10;
byte[] expected = randomData(dataSize);
byte[] actual = new byte[dataSize];

byte[] expected = randomData(CONTENT_LENGTH);
byte[] actual = new byte[CONTENT_LENGTH];

long position;
int offset;
Expand All @@ -139,13 +141,13 @@ protected void testRangeRead(S3Client s3Client) throws Exception {
readAndCheckRanges(in, expected, position, actual, offset, length);

// last 1k
position = dataSize - 1024;
offset = dataSize - 1024;
position = CONTENT_LENGTH - 1024;
offset = CONTENT_LENGTH - 1024;
readAndCheckRanges(in, expected, position, actual, offset, length);

// middle 2k
position = dataSize / 2 - 1024;
offset = dataSize / 2 - 1024;
position = CONTENT_LENGTH / 2 - 1024;
offset = CONTENT_LENGTH / 2 - 1024;
length = 1024 * 2;
readAndCheckRanges(in, expected, position, actual, offset, length);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public SeekableInputStream newStream() {
if (s3FileIOProperties().isS3AnalyticsAcceleratorEnabled()) {
return AnalyticsAcceleratorUtil.newStream(this);
}
return new S3InputStream(client(), uri(), s3FileIOProperties(), metrics());
return new S3InputStream(client(), uri(), s3FileIOProperties(), metrics(), getLength());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: getLength()

While this does not appear to have any impact as such, it does add a lot of noise to the logs

if this is just for logs there can be cases where the getLength() could be a getObjectMetadata call to s3 is it a valid tradeoff ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That read() causes the abort to fail, as it throws the exception.

The abort() code will swallow the exception right now, but this shows up as

software.amazon.awssdk.core.exception.RetryableException: Data read has a different checksum than expected. Was 0x4dd4fa955ccf4a27e2f635a22948298d, but expected 0x00000000000000000000000000000000. This commonly means that the data was corrupted between the client and service.

 WARN S3InputStream: An error occurred while aborting the stream

And this happens pretty frequently.

I'm still quite new to iceberg, but from my understanding, by this point we should already have the length and this shouldn't result in a new HEAD call, I expect:

  • Either its been passed down from the metadata from the avro metadata files
  • Or the parquet reader has already asked for it to figure out what bytes it needs for the footer.

I don't think this getLength() call will cause an extra HEAD, but could totally be wrong. what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by this point we should already have the length and this shouldn't result in a new HEAD call

Icebergs metadata.json and manifest list would not have content length but subsequent reads down the iceberg metadata tree i,e mainifest / data / delete files should have it

How about correcting the error message ? if this is too much of a noise

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you're saying removing the LOG.warn("An error occurred while aborting the stream", e);? Not sure if we want that abort() to fail silently.

I'm trying to see if I can figure out what it really means if an abort() fails on a stream that threw an exception. For eg, what's happening right now is:

* We opened a stream from 0-EoF, where EoF is 500MB
* After reading 100MB, a retryable exception was thrown
* In the resetForRetry(), abort() is called, but it fails because the `read()` fails.

My expectation is that since the underlying stream already failed, it's released that connection and the abort()/close() is a NOOP at this point. but i'm concerned that the abort() failing might mean that connections aren't getting released properly. Will try to confirm.

Copy link
Contributor

@singhpk234 singhpk234 Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not saying removing, i am saying better rephrasing ?

that the abort() failing might mean that connections aren't getting released properly.Will try to confirm.

agree, it will be a good thing to confirm if failure when aborting the stream leads to connection not being released in the pool, then i think its a valid tradeoff vs introducing a new getLenght() for some objects

abort() was introduced here as an optimization in the first place to reuse connection, please ref: https://github.com/apache/iceberg/pull/7262/files#r1156363240

}

@Override
Expand Down
25 changes: 20 additions & 5 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Arrays;
import java.util.List;
import javax.net.ssl.SSLException;
import org.apache.http.ConnectionClosedException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.FileIOMetricsContext;
import org.apache.iceberg.io.IOUtil;
Expand All @@ -53,7 +54,11 @@ class S3InputStream extends SeekableInputStream implements RangeReadable {
private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class);

private static final List<Class<? extends Throwable>> RETRYABLE_EXCEPTIONS =
ImmutableList.of(SSLException.class, SocketTimeoutException.class, SocketException.class);
ImmutableList.of(
SSLException.class,
SocketTimeoutException.class,
SocketException.class,
ConnectionClosedException.class);

private final StackTraceElement[] createStack;
private final S3Client s3;
Expand All @@ -63,6 +68,7 @@ class S3InputStream extends SeekableInputStream implements RangeReadable {
private InputStream stream;
private long pos = 0;
private long next = 0;
private long contentLength = 0;
private boolean closed = false;

private final Counter readBytes;
Expand All @@ -86,15 +92,20 @@ class S3InputStream extends SeekableInputStream implements RangeReadable {
.withMaxRetries(3)
.build();

S3InputStream(S3Client s3, S3URI location) {
this(s3, location, new S3FileIOProperties(), MetricsContext.nullMetrics());
S3InputStream(S3Client s3, S3URI location, long contentLength) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make a new constructor for using these arguments ?

Copy link
Contributor

@danielcweeks danielcweeks Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is protected/private so we don't necessarily need to create a new constructor, but I'm not convinced we actually should do this.

this(s3, location, new S3FileIOProperties(), MetricsContext.nullMetrics(), contentLength);
}

S3InputStream(
S3Client s3, S3URI location, S3FileIOProperties s3FileIOProperties, MetricsContext metrics) {
S3Client s3,
S3URI location,
S3FileIOProperties s3FileIOProperties,
MetricsContext metrics,
long contentLength) {
this.s3 = s3;
this.location = location;
this.s3FileIOProperties = s3FileIOProperties;
this.contentLength = contentLength;

this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, Unit.BYTES);
this.readOperations = metrics.counter(FileIOMetricsContext.READ_OPERATIONS);
Expand Down Expand Up @@ -278,14 +289,18 @@ private void closeStream(boolean closeQuietly) throws IOException {

private void abortStream() {
try {
if (stream instanceof Abortable && stream.read() != -1) {
if (stream instanceof Abortable && remainingInCurrentRequest() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than adding the length check here, which adds extra requests when length isn't provided, why can't we just removed the stream.read() check here and quietly swallow errors on the abort path? I'm not sure we really need to validate the stream isn't consumed to abort. Even then, we can just ignore the exception since it's effectively closed at that point.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielcweeks if we remove the read(), and then also don't do the remainingInCurrentRequest(), then will abort() the stream, even when we're at EoF. From what I understand, calling abort() removes the connection, and close() allows for the connection reuse.

What is not clear to me is if the same behaviour holds if you're at EoF. What I would like to avoid is removing connections from the pool, when they could be re-used.

I am trying to confirm the above behaviour with the AWS SDK team, but do let me know if you have any advice here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is quite low level, and depends on the HTTP client implementation I haven't been able to find a clear answer yet. but agree on not making the getLength() call just for this, my understanding was that the length would already be available at this point.

Copy link
Contributor

@nastra nastra Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add some historical context on why the stream.read() != -1 was added: #7262 (comment)

I think we should keep the read there and just remove the noisy warning messages as @bryanck already called out in #7262 (comment). Otherwise the connection will be invalidated and removed from the pool and thus won't be reused

((Abortable) stream).abort();
}
} catch (Exception e) {
LOG.warn("An error occurred while aborting the stream", e);
}
}

private long remainingInCurrentRequest() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] remainingInCurrentRequest should be bytesRemainingInCurrentRequest

return this.contentLength - this.pos;
}

public void setSkipSize(int skipSize) {
this.skipSize = skipSize;
}
Expand Down
37 changes: 34 additions & 3 deletions aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,44 @@

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;

@ExtendWith(MockitoExtension.class)
public final class TestS3InputStream {

@Mock private S3Client s3Client;
@Mock private InputStream inputStream;
private AbortableInputStream inputStream;

private S3InputStream s3InputStream;

private static final int CONTENT_LENGTH = 1024;

@BeforeEach
void before() {
byte[] writeValue = new byte[CONTENT_LENGTH];
Arrays.fill(writeValue, (byte) 1);

inputStream =
spy(AbortableInputStream.create(new ByteArrayInputStream(new byte[CONTENT_LENGTH])));
when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class)))
.thenReturn(inputStream);
s3InputStream = new S3InputStream(s3Client, mock());
s3InputStream = new S3InputStream(s3Client, mock(), CONTENT_LENGTH);
}

@Test
Expand All @@ -62,4 +73,24 @@ void testReadTailClosesTheStream() throws IOException {

verify(inputStream).close();
}

@Test
void testAbortIsCalledAfterPartialRead() throws IOException {
byte[] buff = new byte[500];
s3InputStream.read(buff);

// close after reading partial object, should call abort
s3InputStream.close();
verify(inputStream).abort();
}

@Test
void testAbortIsCalledAfterFullRead() throws IOException {
byte[] buff = new byte[CONTENT_LENGTH];
s3InputStream.read(buff);

// If we're at EoF, this should not call abort.
s3InputStream.close();
verify(inputStream, never()).abort();
}
}
Loading