Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@

import org.apache.hadoop.fs.impl.CombinedFileRange;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.functional.Function4RaisingIOE;

/**
* Utility class which implements helper methods used
* in vectored IO implementation.
*/
public final class VectoredReadUtils {

private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024;

/**
* Validate a single range.
* @param range file range.
Expand Down Expand Up @@ -114,21 +117,47 @@ private static void readNonByteBufferPositionedReadable(PositionedReadable strea
FileRange range,
ByteBuffer buffer) throws IOException {
if (buffer.isDirect()) {
buffer.put(readInDirectBuffer(stream, range));
readInDirectBuffer(range.getLength(),
buffer,
(position, buffer1, offset, length) -> {
stream.readFully(position, buffer1, offset, length);
return null;
});
buffer.flip();
} else {
stream.readFully(range.getOffset(), buffer.array(),
buffer.arrayOffset(), range.getLength());
}
}

private static byte[] readInDirectBuffer(PositionedReadable stream,
FileRange range) throws IOException {
// if we need to read data from a direct buffer and the stream doesn't
// support it, we allocate a byte array to use.
byte[] tmp = new byte[range.getLength()];
stream.readFully(range.getOffset(), tmp, 0, tmp.length);
return tmp;
/**
* Read bytes from stream into a byte buffer using an
* intermediate byte array.
* @param length number of bytes to read.
* @param buffer buffer to fill.
* @param operation operation to use for reading data.
* @throws IOException any IOE.
*/
public static void readInDirectBuffer(int length,
ByteBuffer buffer,
Function4RaisingIOE<Integer, byte[], Integer,
Integer, Void> operation) throws IOException {
if (length == 0) {
return;
}
int readBytes = 0;
int position = 0;
int tmpBufferMaxSize = Math.min(TMP_BUFFER_MAX_SIZE, length);
byte[] tmp = new byte[tmpBufferMaxSize];
while (readBytes < length) {
int currentLength = (readBytes + tmpBufferMaxSize) < length ?
tmpBufferMaxSize
: (length - readBytes);
operation.apply(position, tmp, 0, currentLength);
buffer.put(tmp, 0, currentLength);
position = position + currentLength;
readBytes = readBytes + currentLength;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util.functional;

import java.io.IOException;

/**
* Function of arity 4 which may raise an IOException.
* @param <I1> type of arg1.
* @param <I2> type of arg2.
* @param <I3> type of arg3.
* @param <I4> type of arg4.
* @param <R> return type.
*/
public interface Function4RaisingIOE<I1, I2, I3, I4, R> {

/**
* Apply the function.
* @param i1 argument 1.
* @param i2 argument 2.
* @param i3 argument 3.
* @param i4 argument 4.
* @return return value.
* @throws IOException any IOE.
*/
R apply(I1 i1, I2 i2, I3 i3, I4 i4) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,17 +386,31 @@ public void testReadVectored() throws Exception {
List<FileRange> input = Arrays.asList(FileRange.createFileRange(0, 100),
FileRange.createFileRange(100_000, 100),
FileRange.createFileRange(200_000, 100));
runAndValidateVectoredRead(input);
}

@Test
public void testReadVectoredZeroBytes() throws Exception {
List<FileRange> input = Arrays.asList(FileRange.createFileRange(0, 0),
FileRange.createFileRange(100_000, 100),
FileRange.createFileRange(200_000, 0));
runAndValidateVectoredRead(input);
}


private void runAndValidateVectoredRead(List<FileRange> input)
throws Exception {
Stream stream = Mockito.mock(Stream.class);
Mockito.doAnswer(invocation -> {
fillBuffer(invocation.getArgument(1));
return null;
}).when(stream).readFully(ArgumentMatchers.anyLong(),
ArgumentMatchers.any(ByteBuffer.class));
ArgumentMatchers.any(ByteBuffer.class));
// should not merge the ranges
VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate);
Mockito.verify(stream, Mockito.times(3))
.readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class));
for(int b=0; b < input.size(); ++b) {
.readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class));
for (int b = 0; b < input.size(); ++b) {
validateBuffer("buffer " + b, input.get(b).getData().get(), 0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1152,20 +1152,13 @@ private void readSingleRange(FileRange range, ByteBuffer buffer) {
private void populateBuffer(int length,
ByteBuffer buffer,
S3ObjectInputStream objectContent) throws IOException {

if (buffer.isDirect()) {
int readBytes = 0;
int offset = 0;
byte[] tmp = new byte[TMP_BUFFER_MAX_SIZE];
while (readBytes < length) {
checkIfVectoredIOStopped();
int currentLength = readBytes + TMP_BUFFER_MAX_SIZE < length ?
TMP_BUFFER_MAX_SIZE
: length - readBytes;
readByteArray(objectContent, tmp, 0, currentLength);
buffer.put(tmp, 0, currentLength);
offset = offset + currentLength;
readBytes = readBytes + currentLength;
}
VectoredReadUtils.readInDirectBuffer(length, buffer,
(position, tmp, offset, currentLength) -> {
readByteArray(objectContent, tmp, offset, currentLength);
return null;
});
buffer.flip();
} else {
readByteArray(objectContent, buffer.array(), 0, length);
Expand All @@ -1174,6 +1167,7 @@ private void populateBuffer(int length,
incrementBytesRead(length);
}


/**
* Read data into destination buffer from s3 object content.
* @param objectContent result from S3.
Expand Down