-
Notifications
You must be signed in to change notification settings - Fork 3k
Uses content length to determine when to abort the stream. #14329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ | |
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import javax.net.ssl.SSLException; | ||
| import org.apache.http.ConnectionClosedException; | ||
| import org.apache.iceberg.exceptions.NotFoundException; | ||
| import org.apache.iceberg.io.FileIOMetricsContext; | ||
| import org.apache.iceberg.io.IOUtil; | ||
|
|
@@ -53,7 +54,11 @@ class S3InputStream extends SeekableInputStream implements RangeReadable { | |
| private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class); | ||
|
|
||
| private static final List<Class<? extends Throwable>> RETRYABLE_EXCEPTIONS = | ||
| ImmutableList.of(SSLException.class, SocketTimeoutException.class, SocketException.class); | ||
| ImmutableList.of( | ||
| SSLException.class, | ||
| SocketTimeoutException.class, | ||
| SocketException.class, | ||
| ConnectionClosedException.class); | ||
|
|
||
| private final StackTraceElement[] createStack; | ||
| private final S3Client s3; | ||
|
|
@@ -63,6 +68,7 @@ class S3InputStream extends SeekableInputStream implements RangeReadable { | |
| private InputStream stream; | ||
| private long pos = 0; | ||
| private long next = 0; | ||
| private long contentLength = 0; | ||
| private boolean closed = false; | ||
|
|
||
| private final Counter readBytes; | ||
|
|
@@ -86,15 +92,20 @@ class S3InputStream extends SeekableInputStream implements RangeReadable { | |
| .withMaxRetries(3) | ||
| .build(); | ||
|
|
||
| S3InputStream(S3Client s3, S3URI location) { | ||
| this(s3, location, new S3FileIOProperties(), MetricsContext.nullMetrics()); | ||
| S3InputStream(S3Client s3, S3URI location, long contentLength) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you make a new constructor for using these arguments ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class is protected/private so we don't necessarily need to create a new constructor, but I'm not convinced we actually should do this. |
||
| this(s3, location, new S3FileIOProperties(), MetricsContext.nullMetrics(), contentLength); | ||
| } | ||
|
|
||
| S3InputStream( | ||
| S3Client s3, S3URI location, S3FileIOProperties s3FileIOProperties, MetricsContext metrics) { | ||
| S3Client s3, | ||
| S3URI location, | ||
| S3FileIOProperties s3FileIOProperties, | ||
| MetricsContext metrics, | ||
| long contentLength) { | ||
| this.s3 = s3; | ||
| this.location = location; | ||
| this.s3FileIOProperties = s3FileIOProperties; | ||
| this.contentLength = contentLength; | ||
|
|
||
| this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, Unit.BYTES); | ||
| this.readOperations = metrics.counter(FileIOMetricsContext.READ_OPERATIONS); | ||
|
|
@@ -278,14 +289,18 @@ private void closeStream(boolean closeQuietly) throws IOException { | |
|
|
||
| private void abortStream() { | ||
| try { | ||
| if (stream instanceof Abortable && stream.read() != -1) { | ||
| if (stream instanceof Abortable && remainingInCurrentRequest() > 0) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than adding the length check here, which adds extra requests when length isn't provided, why can't we just removed the
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @danielcweeks if we remove the What is not clear to me is if the same behaviour holds if you're at EoF. What I would like to avoid is removing connections from the pool, when they could be re-used. I am trying to confirm the above behaviour with the AWS SDK team, but do let me know if you have any advice here.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is quite low level, and depends on the HTTP client implementation I haven't been able to find a clear answer yet. but agree on not making the getLength() call just for this, my understanding was that the length would already be available at this point.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To add some historical context on why the I think we should keep the read there and just remove the noisy warning messages as @bryanck already called out in #7262 (comment). Otherwise the connection will be invalidated and removed from the pool and thus won't be reused |
||
| ((Abortable) stream).abort(); | ||
| } | ||
| } catch (Exception e) { | ||
| LOG.warn("An error occurred while aborting the stream", e); | ||
| } | ||
| } | ||
|
|
||
| private long remainingInCurrentRequest() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] remainingInCurrentRequest should be bytesRemainingInCurrentRequest |
||
| return this.contentLength - this.pos; | ||
| } | ||
|
|
||
| public void setSkipSize(int skipSize) { | ||
| this.skipSize = skipSize; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re: getLength()
if this is just for logs there can be cases where the getLength() could be a getObjectMetadata call to s3 is it a valid tradeoff ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That
read()causes the abort to fail, as it throws the exception.The abort() code will swallow the exception right now, but this shows up as
And this happens pretty frequently.
I'm still quite new to iceberg, but from my understanding, by this point we should already have the length and this shouldn't result in a new HEAD call, I expect:
I don't think this
getLength()call will cause an extra HEAD, but could totally be wrong. what do you think?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Icebergs metadata.json and manifest list would not have content length but subsequent reads down the iceberg metadata tree i,e mainifest / data / delete files should have it
How about correcting the error message ? if this is too much of a noise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you're saying removing the
LOG.warn("An error occurred while aborting the stream", e);? Not sure if we want thatabort()to fail silently.I'm trying to see if I can figure out what it really means if an
abort()fails on a stream that threw an exception. For eg, what's happening right now is:My expectation is that since the underlying stream already failed, it's released that connection and the
abort()/close()is a NOOP at this point. but i'm concerned that theabort()failing might mean that connections aren't getting released properly. Will try to confirm.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not saying removing, i am saying better rephrasing ?
agree, it will be a good thing to confirm if failure when aborting the stream leads to connection not being released in the pool, then i think its a valid tradeoff vs introducing a new getLenght() for some objects
abort() was introduced here as an optimization in the first place to reuse connection, please ref: https://github.com/apache/iceberg/pull/7262/files#r1156363240