Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,15 @@ public synchronized int read(byte[] b, int off, int len) throws IOException {
ChunkInputStream current = chunkStreams.get(chunkIndex);
int numBytesToRead = Math.min(len, (int)current.getRemaining());
int numBytesRead = current.read(b, off, numBytesToRead);

if (numBytesRead != numBytesToRead) {
// This implies that there is either data loss or corruption in the
// chunk entries. Even EOF in the current stream would be covered in
// this case.
throw new IOException(String.format(
"Inconsistent read for chunkName=%s length=%d numBytesRead=%d",
current.getChunkName(), current.getLength(), numBytesRead));
"Inconsistent read for chunkName=%s length=%d numBytesToRead= %d " +
"numBytesRead=%d", current.getChunkName(), current.getLength(),
numBytesToRead, numBytesRead));
}
totalReadLen += numBytesRead;
off += numBytesRead;
Expand Down Expand Up @@ -315,6 +317,11 @@ public synchronized void seek(long pos) throws IOException {
// Reset the previous chunkStream's position
chunkStreams.get(chunkIndexOfPrevPosition).resetPosition();

// Reset all the chunkStreams above the chunkIndex. We do this to reset
// any previous reads which might have updated the chunkPosition.
for (int index = chunkIndex + 1; index < chunkStreams.size(); index++) {
chunkStreams.get(index).seek(0);
}
// seek to the proper offset in the ChunkInputStream
chunkStreams.get(chunkIndex).seek(pos - chunkOffsets[chunkIndex]);
chunkIndexOfPrevPosition = chunkIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class ChunkInputStream extends InputStream implements Seekable {
private XceiverClientSpi xceiverClient;
private boolean verifyChecksum;
private boolean allocated = false;

// Buffer to store the chunk data read from the DN container
private List<ByteBuffer> buffers;

Expand All @@ -75,7 +74,7 @@ public class ChunkInputStream extends InputStream implements Seekable {

private static final int EOF = -1;

ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId,
ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId,
XceiverClientSpi xceiverClient, boolean verifyChecksum) {
this.chunkInfo = chunkInfo;
this.length = chunkInfo.getLen();
Expand Down Expand Up @@ -520,6 +519,9 @@ private boolean chunkStreamEOF() {
private void releaseBuffers() {
buffers = null;
bufferIndex = 0;
// We should not reset bufferOffset and bufferLength here because when
// getPos() is called in chunkStreamsEOF() we use these values and
// determine whether chunk is read completely or not.
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,10 @@ public synchronized int read(byte[] b, int off, int len) throws IOException {
// This implies that there is either data loss or corruption in the
// chunk entries. Even EOF in the current stream would be covered in
// this case.
throw new IOException(String.format(
"Inconsistent read for blockID=%s length=%d numBytesRead=%d",
current.getBlockID(), current.getLength(), numBytesRead));
throw new IOException(String.format("Inconsistent read for blockID=%s "
+ "length=%d numBytesToRead=%d numBytesRead=%d",
current.getBlockID(), current.getLength(), numBytesToRead,
numBytesRead));
}
totalReadLen += numBytesRead;
off += numBytesRead;
Expand Down Expand Up @@ -239,6 +240,12 @@ public synchronized void seek(long pos) throws IOException {
// Reset the previous blockStream's position
blockStreams.get(blockIndexOfPrevPosition).resetPosition();

// Reset all the blockStreams above the blockIndex. We do this to reset
// any previous reads which might have updated the blockPosition and
// chunkIndex.
for (int index = blockIndex + 1; index < blockStreams.size(); index++) {
blockStreams.get(index).seek(0);
}
// 2. Seek the blockStream to the adjusted position
blockStreams.get(blockIndex).seek(pos - blockOffsets[blockIndex]);
blockIndexOfPrevPosition = blockIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.junit.Test;

import java.io.IOException;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -118,6 +119,107 @@ private OzoneOutputStream createKey(String keyName, ReplicationType type,
.createKey(keyName, type, size, objectStore, volumeName, bucketName);
}


@Test
public void testSeekRandomly() throws Exception {
XceiverClientMetrics metrics = XceiverClientManager
.getXceiverClientMetrics();

String keyName = getKeyName();
OzoneOutputStream key = ContainerTestHelper.createKey(keyName,
ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);

// write data of more than 2 blocks.
int dataLength = (2 * blockSize) + (chunkSize);

Random rd = new Random();
byte[] inputData = new byte[dataLength];
rd.nextBytes(inputData);
key.write(inputData);
key.close();


KeyInputStream keyInputStream = (KeyInputStream) objectStore
.getVolume(volumeName).getBucket(bucketName).readKey(keyName)
.getInputStream();

// Seek to some where end.
validate(keyInputStream, inputData, dataLength-200, 100);

// Now seek to start.
validate(keyInputStream, inputData, 0, 140);

validate(keyInputStream, inputData, 200, 300);

validate(keyInputStream, inputData, 30, 500);

randomSeek(dataLength, keyInputStream, inputData);

// Read entire key.
validate(keyInputStream, inputData, 0, dataLength);

// Repeat again and check.
randomSeek(dataLength, keyInputStream, inputData);

validate(keyInputStream, inputData, 0, dataLength);

keyInputStream.close();
}

/**
* This method does random seeks and reads and validates the reads are
* correct or not.
* @param dataLength
* @param keyInputStream
* @param inputData
* @throws Exception
*/
private void randomSeek(int dataLength, KeyInputStream keyInputStream,
byte[] inputData) throws Exception {
// Do random seek.
for (int i=0; i<dataLength - 300; i+=20) {
validate(keyInputStream, inputData, i, 200);
}

// Seek to end and read in reverse order. And also this is partial chunks
// as readLength is 20, chunk length is 100.
for (int i=dataLength - 100; i>=100; i-=20) {
validate(keyInputStream, inputData, i, 20);
}

// Start from begin and seek such that we read partially chunks.
for (int i=0; i<dataLength - 300; i+=20) {
validate(keyInputStream, inputData, i, 90);
}

}

/**
* This method seeks to specified seek value and read the data specified by
* readLength and validate the read is correct or not.
* @param keyInputStream
* @param inputData
* @param seek
* @param readLength
* @throws Exception
*/
private void validate(KeyInputStream keyInputStream, byte[] inputData,
long seek, int readLength) throws Exception {
keyInputStream.seek(seek);

byte[] expectedData = new byte[readLength];
keyInputStream.read(expectedData, 0, readLength);

byte[] dest = new byte[readLength];

System.arraycopy(inputData, (int)seek, dest, 0, readLength);

for (int i=0; i < readLength; i++) {
Assert.assertEquals(expectedData[i], dest[i]);
}
}


@Test
public void testSeek() throws Exception {
XceiverClientMetrics metrics = XceiverClientManager
Expand Down Expand Up @@ -153,8 +255,6 @@ public void testSeek() throws Exception {
// Seek operation should not result in any readChunk operation.
Assert.assertEquals(readChunkCount, metrics
.getContainerOpsMetrics(ContainerProtos.Type.ReadChunk));
Assert.assertEquals(readChunkCount, metrics
.getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));

byte[] readData = new byte[chunkSize];
keyInputStream.read(readData, 0, chunkSize);
Expand Down