Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.StringJoiner;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -181,4 +182,11 @@ public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate);
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate, release);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.zip.CRC32;

Expand Down Expand Up @@ -438,6 +439,13 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes,
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
readVectored(ranges, allocate, (b) -> { });
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {

// If the stream doesn't have checksums, just delegate.
if (sums == null) {
Expand All @@ -462,8 +470,8 @@ public void readVectored(List<? extends FileRange> ranges,
}
List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges,
bytesPerSum, minSeek, maxSize);
sums.readVectored(checksumRanges, allocate);
datas.readVectored(dataRanges, allocate);
sums.readVectored(checksumRanges, allocate, release);
datas.readVectored(dataRanges, allocate, release);
for(CombinedFileRange checksumRange: checksumRanges) {
for(FileRange dataRange: checksumRange.getUnderlying()) {
// when we have both the ranges, validate the checksum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -306,4 +307,11 @@ public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate);
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate, release);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.io.Sizes.S_16K;
import static org.apache.hadoop.io.Sizes.S_1M;

Expand Down Expand Up @@ -136,4 +138,31 @@ default void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
VectoredReadUtils.readVectored(this, ranges, allocate);
}

/**
* Extension of {@link #readVectored(List, IntFunction)} where a {@code release(buffer)}
* operation may be invoked if problems surface during reads.
* <p>
* The {@code release} operation is invoked after an IOException
* to return the actively buffer to a pool before reporting a failure
* in the future.
* <p>
* The default implementation calls {@link #readVectored(List, IntFunction)}.p
* <p>
* Implementations SHOULD override this method if they can release buffers as
* part of their error handling.
* @param ranges the byte ranges to read
* @param allocate function to allocate ByteBuffer
* @param release callable to release a ByteBuffer.
* @throws IOException any IOE.
* @throws IllegalArgumentException if any of ranges are invalid, or they overlap.
* @throws NullPointerException null arguments.
*/
default void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate,
Consumer<ByteBuffer> release) throws IOException {
requireNonNull(release);
readVectored(ranges, allocate);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,29 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.impl.VectorIOBufferPool;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList;
import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
Expand Down Expand Up @@ -319,74 +323,131 @@ AsynchronousFileChannel getAsyncChannel() throws IOException {
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {

// Validate, but do not pass in a file length as it may change.
List<? extends FileRange> sortedRanges = sortRangeList(ranges);
// Set up all of the futures, so that we can use them if things fail
for(FileRange range: sortedRanges) {
// Set up all of the futures, so that the caller can await on
// their completion.
for (FileRange range: sortedRanges) {
validateRangeRequest(range);
range.setData(new CompletableFuture<>());
}
try {
AsynchronousFileChannel channel = getAsyncChannel();
ByteBuffer[] buffers = new ByteBuffer[sortedRanges.size()];
AsyncHandler asyncHandler = new AsyncHandler(channel, sortedRanges, buffers);
for(int i = 0; i < sortedRanges.size(); ++i) {
FileRange range = sortedRanges.get(i);
buffers[i] = allocate.apply(range.getLength());
channel.read(buffers[i], range.getOffset(), i, asyncHandler);
}
} catch (IOException ioe) {
LOG.debug("Exception occurred during vectored read ", ioe);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this was unnecessary. Any failure will automatically be caught in failed() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no IOE to be raised any more

for(FileRange range: sortedRanges) {
range.getData().completeExceptionally(ioe);
}
}
final ByteBufferPool pool = new VectorIOBufferPool(allocate, release);
// Initiate the asynchronous reads.
new AsyncHandler(getAsyncChannel(),
sortedRanges,
pool)
.initiateRead();
}
}

/**
* A CompletionHandler that implements readFully and translates back
* into the form of CompletionHandler that our users expect.
* <p>
* All reads are started in {@link #initiateRead()};
* the handler then receives callbacks on success
* {@link #completed(Integer, Integer)}, and on failure
* by {@link #failed(Throwable, Integer)}.
* These are mapped to the specific range in the read, and its
* outcome updated.
*/
static class AsyncHandler implements CompletionHandler<Integer, Integer> {
private static class AsyncHandler implements CompletionHandler<Integer, Integer> {
/** File channel to read from. */
private final AsynchronousFileChannel channel;

/** Ranges to fetch. */
private final List<? extends FileRange> ranges;

/**
* Pool providing allocate/release operations.
*/
private final ByteBufferPool allocateRelease;

/** Buffers being read. */
private final ByteBuffer[] buffers;

AsyncHandler(AsynchronousFileChannel channel,
List<? extends FileRange> ranges,
ByteBuffer[] buffers) {
/**
* Instantiate.
* @param channel open channel.
* @param ranges ranges to read.
* @param allocateRelease pool for allocating buffers, and releasing on failure
*/
AsyncHandler(
final AsynchronousFileChannel channel,
final List<? extends FileRange> ranges,
final ByteBufferPool allocateRelease) {
this.channel = channel;
this.ranges = ranges;
this.buffers = buffers;
this.buffers = new ByteBuffer[ranges.size()];
this.allocateRelease = allocateRelease;
}

/**
* Initiate the read operation.
* <p>
* Allocate all buffers, queue the read into the channel,
* providing this object as the handler.
*/
private void initiateRead() {
for(int i = 0; i < ranges.size(); ++i) {
FileRange range = ranges.get(i);
buffers[i] = allocateRelease.getBuffer(false, range.getLength());
channel.read(buffers[i], range.getOffset(), i, this);
}
}

/**
* Callback for a completed full/partial read.
* <p>
* For an EOF the number of bytes may be -1.
* That is mapped to a {@link #failed(Throwable, Integer)} outcome.
* @param result The bytes read.
* @param rangeIndex range index within the range list.
*/
@Override
public void completed(Integer result, Integer r) {
FileRange range = ranges.get(r);
ByteBuffer buffer = buffers[r];
public void completed(Integer result, Integer rangeIndex) {
FileRange range = ranges.get(rangeIndex);
ByteBuffer buffer = buffers[rangeIndex];
if (result == -1) {
failed(new EOFException("Read past End of File"), r);
// no data was read back.
failed(new EOFException("Read past End of File"), rangeIndex);
} else {
if (buffer.remaining() > 0) {
// issue a read for the rest of the buffer
// QQ: What if this fails? It has the same handler.
channel.read(buffer, range.getOffset() + buffer.position(), r, this);
channel.read(buffer, range.getOffset() + buffer.position(), rangeIndex, this);
} else {
// QQ: Why is this required? I think because we don't want the
// user to read data beyond limit.
// Flip the buffer and declare success.
buffer.flip();
range.getData().complete(buffer);
}
}
}

/**
* The read of the range failed.
* <p>
* Release the buffer supplied for this range, then
* report to the future as {{completeExceptionally(exc)}}
* @param exc exception.
* @param rangeIndex range index within the range list.
*/
@Override
public void failed(Throwable exc, Integer r) {
LOG.debug("Failed while reading range {} ", r, exc);
ranges.get(r).getData().completeExceptionally(exc);
public void failed(Throwable exc, Integer rangeIndex) {
LOG.debug("Failed while reading range {} ", rangeIndex, exc);
// release the buffer
allocateRelease.putBuffer(buffers[rangeIndex]);
// report the failure.
ranges.get(rangeIndex).getData().completeExceptionally(exc);
}

}

@Override
Expand Down
Loading