Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1564,8 +1564,11 @@ public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> operation) {
CompletableFuture<T> result = new CompletableFuture<>();
unboundedThreadPool.submit(() ->
LambdaUtils.eval(result, () -> {
LOG.debug("Starting submitted operation in {}", auditSpan.getSpanId());
try (AuditSpan span = auditSpan.activate()) {
return operation.apply();
} finally {
LOG.debug("Completed submitted operation in {}", auditSpan.getSpanId());
}
}));
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.hadoop.fs.impl.CombinedFileRange;
import org.apache.hadoop.fs.VectoredReadUtils;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
Expand All @@ -65,7 +67,6 @@
import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges;
import static org.apache.hadoop.fs.VectoredReadUtils.validateNonOverlappingAndReturnSortedRanges;
import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
import static org.apache.hadoop.util.StringUtils.toLowerCase;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

Expand Down Expand Up @@ -97,10 +98,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
public static final String OPERATION_OPEN = "open";
public static final String OPERATION_REOPEN = "re-open";

/**
* size of a buffer to create when draining the stream.
*/
private static final int DRAIN_BUFFER_SIZE = 16384;
/**
* This is the maximum temporary buffer size we use while
* populating the data in direct byte buffers during a vectored IO
Expand Down Expand Up @@ -242,6 +239,15 @@ private void setInputPolicy(S3AInputPolicy inputPolicy) {
streamStatistics.inputPolicySet(inputPolicy.ordinal());
}

/**
* Get the current input policy.
* @return input policy.
*/
@VisibleForTesting
public S3AInputPolicy getInputPolicy() {
return inputPolicy;
}

/**
* Opens up the stream at specified target position and for given length.
*
Expand Down Expand Up @@ -604,7 +610,7 @@ public synchronized void close() throws IOException {
try {
stopVectoredIOOperations.set(true);
// close or abort the stream; blocking
awaitFuture(closeStream("close() operation", false, true));
closeStream("close() operation", false, true);
// end the client+audit span.
client.close();
// this is actually a no-op
Expand Down Expand Up @@ -664,18 +670,25 @@ private CompletableFuture<Boolean> closeStream(
forceAbort ? "abort" : "soft");
boolean shouldAbort = forceAbort || remaining > readahead;
CompletableFuture<Boolean> operation;
SDKStreamDrainer drainer = new SDKStreamDrainer(
uri,
object,
wrappedStream,
shouldAbort,
(int) remaining,
streamStatistics,
reason);

if (blocking || shouldAbort || remaining <= asyncDrainThreshold) {
// don't bother with async io.
operation = CompletableFuture.completedFuture(
drain(shouldAbort, reason, remaining, object, wrappedStream));
// don't bother with async IO if the caller plans to wait for
// the result, there's an abort (which is fast), or
// there is not much data to read.
operation = CompletableFuture.completedFuture(drainer.apply());

} else {
LOG.debug("initiating asynchronous drain of {} bytes", remaining);
// schedule an async drain/abort with references to the fields so they
// can be reused
operation = client.submit(
() -> drain(false, reason, remaining, object, wrappedStream));
// schedule an async drain/abort
operation = client.submit(drainer);
}

// either the stream is closed in the blocking call or the async call is
Expand All @@ -685,117 +698,6 @@ private CompletableFuture<Boolean> closeStream(
return operation;
}

/**
* drain the stream. This method is intended to be
* used directly or asynchronously, and measures the
* duration of the operation in the stream statistics.
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object; needed to avoid GC issues.
* @param inner stream to close.
* @return was the stream aborted?
*/
private boolean drain(
final boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final S3ObjectInputStream inner) {

try {
return invokeTrackingDuration(
streamStatistics.initiateInnerStreamClose(shouldAbort),
() -> drainOrAbortHttpStream(
shouldAbort,
reason,
remaining,
requestObject,
inner));
} catch (IOException e) {
// this is only here because invokeTrackingDuration() has it in its
// signature
return shouldAbort;
}
}

/**
* Drain or abort the inner stream.
* Exceptions are swallowed.
* If a close() is attempted and fails, the operation escalates to
* an abort.
*
* This does not set the {@link #closed} flag.
*
* A reference to the stream is passed in so that the instance
* {@link #wrappedStream} field can be reused as soon as this
* method is submitted;
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object; needed to avoid GC issues.
* @param inner stream to close.
* @return was the stream aborted?
*/
private boolean drainOrAbortHttpStream(
boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final S3ObjectInputStream inner) {
// force a use of the request object so IDEs don't warn of
// lack of use.
requireNonNull(requestObject);

if (!shouldAbort) {
try {
// clean close. This will read to the end of the stream,
// so, while cleaner, can be pathological on a multi-GB object

// explicitly drain the stream
long drained = 0;
byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
while (true) {
final int count = inner.read(buffer);
if (count < 0) {
// no more data is left
break;
}
drained += count;
}
LOG.debug("Drained stream of {} bytes", drained);

// now close it
inner.close();
// this MUST come after the close, so that if the IO operations fail
// and an abort is triggered, the initial attempt's statistics
// aren't collected.
streamStatistics.streamClose(false, drained);
} catch (Exception e) {
// exception escalates to an abort
LOG.debug("When closing {} stream for {}, will abort the stream",
uri, reason, e);
shouldAbort = true;
}
}
if (shouldAbort) {
// Abort, rather than just close, the underlying stream. Otherwise, the
// remaining object payload is read from S3 while closing the stream.
LOG.debug("Aborting stream {}", uri);
try {
inner.abort();
} catch (Exception e) {
LOG.warn("When aborting {} stream after failing to close it for {}",
uri, reason, e);
}
streamStatistics.streamClose(true, remaining);
}
LOG.debug("Stream {} {}: {}; remaining={}",
uri, (shouldAbort ? "aborted" : "closed"), reason,
remaining);
return shouldAbort;
}

/**
* Forcibly reset the stream, by aborting the connection. The next
* {@code read()} operation will trigger the opening of a new HTTPS
Expand Down Expand Up @@ -1080,8 +982,8 @@ private void drainUnnecessaryData(S3ObjectInputStream objectContent, long drainQ
int drainBytes = 0;
int readCount;
while (drainBytes < drainQuantity) {
if (drainBytes + DRAIN_BUFFER_SIZE <= drainQuantity) {
byte[] drainBuffer = new byte[DRAIN_BUFFER_SIZE];
if (drainBytes + InternalConstants.DRAIN_BUFFER_SIZE <= drainQuantity) {
byte[] drainBuffer = new byte[InternalConstants.DRAIN_BUFFER_SIZE];
readCount = objectContent.read(drainBuffer);
} else {
byte[] drainBuffer = new byte[(int) (drainQuantity - drainBytes)];
Expand Down Expand Up @@ -1345,6 +1247,11 @@ public synchronized void unbuffer() {
closeStream("unbuffer()", false, false);
} finally {
streamStatistics.unbuffered();
if (inputPolicy.isAdaptive()) {
S3AInputPolicy policy = S3AInputPolicy.Random;
LOG.debug("Switching to seek policy {} after unbuffer() invoked", policy);
setInputPolicy(policy);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ private static InterruptedIOException translateInterruptedException(
} else {
String name = innerCause.getClass().getName();
if (name.endsWith(".ConnectTimeoutException")
|| name.endsWith(".ConnectionPoolTimeoutException")
|| name.endsWith("$ConnectTimeoutException")) {
// TCP connection http timeout from the shaded or unshaded filenames
// com.amazonaws.thirdparty.apache.http.conn.ConnectTimeoutException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,10 @@ public enum Statistic {
StreamStatisticNames.STREAM_READ_TOTAL_BYTES,
"Total count of bytes read from an input stream",
TYPE_COUNTER),
STREAM_READ_UNBUFFERED(
StreamStatisticNames.STREAM_READ_UNBUFFERED,
"Total count of input stream unbuffering operations",
TYPE_COUNTER),

/* Stream Write statistics */

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public final class InternalConstants {
*/
public static final boolean DELETE_CONSIDERED_IDEMPOTENT = true;

/**
* size of a buffer to create when draining the stream.
*/
public static final int DRAIN_BUFFER_SIZE = 16384;

private InternalConstants() {
}

Expand Down Expand Up @@ -97,6 +102,7 @@ private InternalConstants() {

static {
Set<String> keys = Stream.of(
Constants.ASYNC_DRAIN_THRESHOLD,
Constants.INPUT_FADVISE,
Constants.READAHEAD_RANGE)
.collect(Collectors.toSet());
Expand Down
Loading