Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.Seekable;
Expand Down Expand Up @@ -71,8 +70,8 @@ public class ChunkInputStream extends InputStream
private final Supplier<Pipeline> pipelineSupplier;
private boolean verifyChecksum;
private boolean allocated = false;
// Buffer to store the chunk data read from the DN container
private List<ByteBuffer> buffers;
// Buffers to store the chunk data read from the DN container
private ByteBuffer[] buffers;

// Index of the buffers corresponding to the current position of the buffers
private int bufferIndex;
Expand All @@ -89,6 +88,9 @@ public class ChunkInputStream extends InputStream
// of chunk data
private long bufferOffsetWrtChunkData;

// Index of the first buffer which has not been released
private int firstUnreleasedBufferIndex = 0;

// The number of bytes of chunk data residing in the buffers currently
private long buffersSize;

Expand Down Expand Up @@ -133,13 +135,11 @@ public synchronized int read() throws IOException {
// been released by now
Preconditions.checkState(buffers == null);
} else {
dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
dataout = Byte.toUnsignedInt(buffers[bufferIndex].get());
}

if (chunkStreamEOF()) {
// consumer might use getPos to determine EOF,
// so release buffers when serving the last byte of data
releaseBuffers();
if (bufferEOF()) {
releaseBuffers(bufferIndex);
}

return dataout;
Expand Down Expand Up @@ -179,15 +179,13 @@ public synchronized int read(byte[] b, int off, int len) throws IOException {
Preconditions.checkState(buffers == null);
return total != 0 ? total : EOF;
}
buffers.get(bufferIndex).get(b, off + total, available);
buffers[bufferIndex].get(b, off + total, available);
len -= available;
total += available;
}

if (chunkStreamEOF()) {
// smart consumers determine EOF by calling getPos()
// so we release buffers when serving the final bytes of data
releaseBuffers();
if (bufferEOF()) {
releaseBuffers(bufferIndex);
}
}

return total;
Expand Down Expand Up @@ -233,7 +231,7 @@ public synchronized long getPos() {
// BufferOffset w.r.t to ChunkData + BufferOffset w.r.t buffers +
// Position of current Buffer
return bufferOffsetWrtChunkData + bufferOffsets[bufferIndex] +
buffers.get(bufferIndex).position();
buffers[bufferIndex].position();
}
if (buffersAllocated()) {
return bufferOffsetWrtChunkData + buffersSize;
Expand Down Expand Up @@ -289,7 +287,7 @@ private synchronized int prepareRead(int len) throws IOException {
}
if (buffersHaveData()) {
// Data is available from buffers
ByteBuffer bb = buffers.get(bufferIndex);
ByteBuffer bb = buffers[bufferIndex];
return len > bb.remaining() ? bb.remaining() : len;
} else if (dataRemainingInChunk()) {
// There is more data in the chunk stream which has not
Expand Down Expand Up @@ -370,22 +368,23 @@ private void readChunkDataIntoBuffers(ChunkInfo readChunkInfo)
buffers = readChunk(readChunkInfo);
buffersSize = readChunkInfo.getLen();

bufferOffsets = new long[buffers.size()];
bufferOffsets = new long[buffers.length];
int tempOffset = 0;
for (int i = 0; i < buffers.size(); i++) {
for (int i = 0; i < buffers.length; i++) {
bufferOffsets[i] = tempOffset;
tempOffset += buffers.get(i).limit();
tempOffset += buffers[i].limit();
}

bufferIndex = 0;
firstUnreleasedBufferIndex = 0;
allocated = true;
}

/**
* Send RPC call to get the chunk from the container.
*/
@VisibleForTesting
protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo)
protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)
throws IOException {
ReadChunkResponseProto readChunkResponse;

Expand All @@ -405,13 +404,12 @@ protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo)
}

if (readChunkResponse.hasData()) {
return readChunkResponse.getData().asReadOnlyByteBufferList();
return readChunkResponse.getData().asReadOnlyByteBufferList()
.toArray(new ByteBuffer[0]);
} else if (readChunkResponse.hasDataBuffers()) {
List<ByteString> buffersList = readChunkResponse.getDataBuffers()
.getBuffersList();
return buffersList.stream()
.map(ByteString::asReadOnlyByteBuffer)
.collect(Collectors.toList());
return BufferUtils.getReadOnlyByteBuffersArray(buffersList);
} else {
throw new IOException("Unexpected error while reading chunk data " +
"from container. No data returned.");
Expand Down Expand Up @@ -508,21 +506,21 @@ private Pair<Long, Long> computeChecksumBoundaries(long startByteIndex,
private void adjustBufferPosition(long bufferPosition) {
// The bufferPosition is w.r.t the current buffers.
// Adjust the bufferIndex and position to the seeked bufferPosition.
if (bufferIndex >= buffers.size()) {
if (bufferIndex >= buffers.length) {
bufferIndex = Arrays.binarySearch(bufferOffsets, bufferPosition);
} else if (bufferPosition < bufferOffsets[bufferIndex]) {
bufferIndex = Arrays.binarySearch(bufferOffsets, 0, bufferIndex,
bufferPosition);
} else if (bufferPosition >= bufferOffsets[bufferIndex] +
buffers.get(bufferIndex).capacity()) {
buffers[bufferIndex].capacity()) {
bufferIndex = Arrays.binarySearch(bufferOffsets, bufferIndex + 1,
buffers.size(), bufferPosition);
buffers.length, bufferPosition);
}
if (bufferIndex < 0) {
bufferIndex = -bufferIndex - 2;
}

buffers.get(bufferIndex).position(
buffers[bufferIndex].position(
(int) (bufferPosition - bufferOffsets[bufferIndex]));

// Reset buffers > bufferIndex to position 0. We do this to reset any
Expand All @@ -531,8 +529,8 @@ private void adjustBufferPosition(long bufferPosition) {
// not required for this read. If a seek was done to a position in the
// previous indices, the buffer position reset would be performed in the
// seek call.
for (int i = bufferIndex + 1; i < buffers.size(); i++) {
buffers.get(i).position(0);
for (int i = bufferIndex + 1; i < buffers.length; i++) {
buffers[i].position(0);
}

// Reset the chunkPosition as chunk stream has been initialized i.e. the
Expand All @@ -545,7 +543,7 @@ private void adjustBufferPosition(long bufferPosition) {
*/
@VisibleForTesting
protected boolean buffersAllocated() {
return buffers != null && !buffers.isEmpty();
return buffers != null && buffers.length > 0;
}

/**
Expand All @@ -556,16 +554,17 @@ private boolean buffersHaveData() {
boolean hasData = false;

if (buffersAllocated()) {
while (bufferIndex < (buffers.size())) {
if (buffers.get(bufferIndex).hasRemaining()) {
while (bufferIndex < (buffers.length)) {
if (buffers[bufferIndex] != null &&
buffers[bufferIndex].hasRemaining()) {
// current buffer has data
hasData = true;
break;
} else {
if (buffersRemaining()) {
// move to next available buffer
++bufferIndex;
Preconditions.checkState(bufferIndex < buffers.size());
Preconditions.checkState(bufferIndex < buffers.length);
} else {
// no more buffers remaining
break;
Expand All @@ -578,7 +577,7 @@ private boolean buffersHaveData() {
}

private boolean buffersRemaining() {
return (bufferIndex < (buffers.size() - 1));
return (bufferIndex < (buffers.length - 1));
}

/**
Expand All @@ -588,7 +587,10 @@ private boolean buffersHavePosition(long pos) {
// Check if buffers have been allocated
if (buffersAllocated()) {
// Check if the current buffers cover the input position
return pos >= bufferOffsetWrtChunkData &&
// Released buffers should not be considered when checking if position
// is available
return pos >= bufferOffsetWrtChunkData +
bufferOffsets[firstUnreleasedBufferIndex] &&
pos < bufferOffsetWrtChunkData + buffersSize;
}
return false;
Expand All @@ -609,6 +611,21 @@ private boolean dataRemainingInChunk() {
return bufferPos < length;
}

/**
* Check if current buffer had been read till the end.
*/
private boolean bufferEOF() {
if (!allocated) {
// Chunk data has not been read yet
return false;
}

if (!buffers[bufferIndex].hasRemaining()) {
return true;
}
return false;
}

/**
* Check if end of chunkStream has been reached.
*/
Expand All @@ -628,12 +645,38 @@ private boolean chunkStreamEOF() {
}
}


/**
* Release a buffers upto the given index.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny nit: make sure to change it before commit
Release a buffers --> Release the buffers

* @param releaseUptoBufferIndex bufferIndex (inclusive) upto which the
* buffers must be released
*/
private void releaseBuffers(int releaseUptoBufferIndex) {
if (releaseUptoBufferIndex == buffers.length - 1) {
// Before releasing all the buffers, if chunk EOF is not reached, then
// chunkPosition should be set to point to the last position of the
// buffers. This should be done so that getPos() can return the current
// chunk position
chunkPosition = bufferOffsetWrtChunkData +
bufferOffsets[releaseUptoBufferIndex] +
buffers[releaseUptoBufferIndex].capacity();
// Release all the buffers
releaseBuffers();
} else {
for (int i = 0; i <= releaseUptoBufferIndex; i++) {
buffers[i] = null;
}
firstUnreleasedBufferIndex = releaseUptoBufferIndex + 1;
}
}

/**
* If EOF is reached, release the buffers.
*/
private void releaseBuffers() {
buffers = null;
bufferIndex = 0;
firstUnreleasedBufferIndex = 0;
// We should not reset bufferOffsetWrtChunkData and buffersSize here
// because when getPos() is called in chunkStreamEOF() we use these
// values and determine whether chunk is read completely or not.
Expand Down Expand Up @@ -671,7 +714,7 @@ public synchronized void unbuffer() {
}

@VisibleForTesting
public List<ByteBuffer> getCachedBuffers() {
return buffers;
public ByteBuffer[] getCachedBuffers() {
return BufferUtils.getReadOnlyByteBuffers(buffers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public DummyChunkInputStream(ChunkInfo chunkInfo,
}

@Override
protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo) {
protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo) {
int offset = (int) readChunkInfo.getOffset();
int remainingToRead = (int) readChunkInfo.getLen();

Expand All @@ -72,7 +72,8 @@ protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo) {
remainingToRead -= bufferLen;
}

return BufferUtils.getReadOnlyByteBuffers(readByteBuffers);
return BufferUtils.getReadOnlyByteBuffers(readByteBuffers)
.toArray(new ByteBuffer[0]);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,26 @@ public void testSeek() throws Exception {
// buffers and the chunkPosition should be reset to -1.
Assert.assertEquals(-1, chunkStream.getChunkPosition());

// Seek to a position within the current buffers. Current buffers contain
// data from index 20 to 59. ChunkPosition should still not be used to
// set the position.
seekAndVerify(35);
// Only the last BYTES_PER_CHECKSUM will be cached in the buffers as
// buffers are released after each checksum boundary is read. So the
// buffers should contain data from index 40 to 59.
// Seek to a position within the cached buffers. ChunkPosition should
// still not be used to set the position.
seekAndVerify(45);
Assert.assertEquals(-1, chunkStream.getChunkPosition());

// Seek to a position outside the current buffers. In this case, the
// Seek to a position outside the current cached buffers. In this case, the
// chunkPosition should be updated to the seeked position.
seekAndVerify(75);
Assert.assertEquals(75, chunkStream.getChunkPosition());

// Read upto checksum boundary should result in all the buffers being
// released and hence chunkPosition updated with current position of chunk.
seekAndVerify(25);
b = new byte[15];
chunkStream.read(b, 0, 15);
matchWithInputData(b, 25, 15);
Assert.assertEquals(40, chunkStream.getChunkPosition());
}

@Test
Expand Down Expand Up @@ -229,8 +239,9 @@ public void connectsToNewPipeline() throws Exception {
ChunkInputStream subject = new ChunkInputStream(chunkInfo, null,
clientFactory, pipelineRef::get, false, null) {
@Override
protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo) {
return ByteString.copyFrom(chunkData).asReadOnlyByteBufferList();
protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo) {
return ByteString.copyFrom(chunkData).asReadOnlyByteBufferList()
.toArray(new ByteBuffer[0]);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.collect.ImmutableList;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

import java.io.IOException;
Expand Down Expand Up @@ -218,13 +217,21 @@ public long writeTo(GatheringByteChannel channel) throws IOException {

@Override
public ByteString toByteStringImpl(Function<ByteBuffer, ByteString> f) {
return buffers.stream().map(f).reduce(ByteString.EMPTY, ByteString::concat);
ByteString result = ByteString.EMPTY;
for (ByteBuffer buffer : buffers) {
result = result.concat(f.apply(buffer));
}
return result;
}

@Override
public List<ByteString> toByteStringListImpl(
Function<ByteBuffer, ByteString> f) {
return buffers.stream().map(f).collect(Collectors.toList());
List<ByteString> byteStringList = new ArrayList<>();
for (ByteBuffer buffer : buffers) {
byteStringList.add(f.apply(buffer));
}
return byteStringList;
}

@Override
Expand Down
Loading