diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 40bbd93b16f1..8404d31e85eb 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -239,13 +239,15 @@ public synchronized int read(byte[] b, int off, int len) throws IOException { ChunkInputStream current = chunkStreams.get(chunkIndex); int numBytesToRead = Math.min(len, (int)current.getRemaining()); int numBytesRead = current.read(b, off, numBytesToRead); + if (numBytesRead != numBytesToRead) { // This implies that there is either data loss or corruption in the // chunk entries. Even EOF in the current stream would be covered in // this case. throw new IOException(String.format( - "Inconsistent read for chunkName=%s length=%d numBytesRead=%d", - current.getChunkName(), current.getLength(), numBytesRead)); + "Inconsistent read for chunkName=%s length=%d numBytesToRead= %d " + + "numBytesRead=%d", current.getChunkName(), current.getLength(), + numBytesToRead, numBytesRead)); } totalReadLen += numBytesRead; off += numBytesRead; @@ -315,6 +317,11 @@ public synchronized void seek(long pos) throws IOException { // Reset the previous chunkStream's position chunkStreams.get(chunkIndexOfPrevPosition).resetPosition(); + // Reset all the chunkStreams above the chunkIndex. We do this to reset + // any previous reads which might have updated the chunkPosition. + for (int index = chunkIndex + 1; index < chunkStreams.size(); index++) { + chunkStreams.get(index).seek(0); + } // seek to the proper offset in the ChunkInputStream chunkStreams.get(chunkIndex).seek(pos - chunkOffsets[chunkIndex]); chunkIndexOfPrevPosition = chunkIndex; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index f94d2d87340b..650c5ea0d17d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -52,7 +52,6 @@ public class ChunkInputStream extends InputStream implements Seekable { private XceiverClientSpi xceiverClient; private boolean verifyChecksum; private boolean allocated = false; - // Buffer to store the chunk data read from the DN container private List buffers; @@ -75,7 +74,7 @@ public class ChunkInputStream extends InputStream implements Seekable { private static final int EOF = -1; - ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId, + ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId, XceiverClientSpi xceiverClient, boolean verifyChecksum) { this.chunkInfo = chunkInfo; this.length = chunkInfo.getLen(); @@ -520,6 +519,9 @@ private boolean chunkStreamEOF() { private void releaseBuffers() { buffers = null; bufferIndex = 0; + // We should not reset bufferOffset and bufferLength here because when + // getPos() is called in chunkStreamsEOF() we use these values and + // determine whether chunk is read completely or not. } /** diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index ecbb3290a7dc..ea81f432a392 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -175,9 +175,10 @@ public synchronized int read(byte[] b, int off, int len) throws IOException { // This implies that there is either data loss or corruption in the // chunk entries. Even EOF in the current stream would be covered in // this case. - throw new IOException(String.format( - "Inconsistent read for blockID=%s length=%d numBytesRead=%d", - current.getBlockID(), current.getLength(), numBytesRead)); + throw new IOException(String.format("Inconsistent read for blockID=%s " + + "length=%d numBytesToRead=%d numBytesRead=%d", + current.getBlockID(), current.getLength(), numBytesToRead, + numBytesRead)); } totalReadLen += numBytesRead; off += numBytesRead; @@ -239,6 +240,12 @@ public synchronized void seek(long pos) throws IOException { // Reset the previous blockStream's position blockStreams.get(blockIndexOfPrevPosition).resetPosition(); + // Reset all the blockStreams above the blockIndex. We do this to reset + // any previous reads which might have updated the blockPosition and + // chunkIndex. + for (int index = blockIndex + 1; index < blockStreams.size(); index++) { + blockStreams.get(index).seek(0); + } // 2. Seek the blockStream to the adjusted position blockStreams.get(blockIndex).seek(pos - blockOffsets[blockIndex]); blockIndexOfPrevPosition = blockIndex; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java index fa8a289ea810..6e7e3285d6ee 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java @@ -37,6 +37,7 @@ import org.junit.Test; import java.io.IOException; +import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -118,6 +119,107 @@ private OzoneOutputStream createKey(String keyName, ReplicationType type, .createKey(keyName, type, size, objectStore, volumeName, bucketName); } + + @Test + public void testSeekRandomly() throws Exception { + XceiverClientMetrics metrics = XceiverClientManager + .getXceiverClientMetrics(); + + String keyName = getKeyName(); + OzoneOutputStream key = ContainerTestHelper.createKey(keyName, + ReplicationType.RATIS, 0, objectStore, volumeName, bucketName); + + // write data of more than 2 blocks. + int dataLength = (2 * blockSize) + (chunkSize); + + Random rd = new Random(); + byte[] inputData = new byte[dataLength]; + rd.nextBytes(inputData); + key.write(inputData); + key.close(); + + + KeyInputStream keyInputStream = (KeyInputStream) objectStore + .getVolume(volumeName).getBucket(bucketName).readKey(keyName) + .getInputStream(); + + // Seek to some where end. + validate(keyInputStream, inputData, dataLength-200, 100); + + // Now seek to start. + validate(keyInputStream, inputData, 0, 140); + + validate(keyInputStream, inputData, 200, 300); + + validate(keyInputStream, inputData, 30, 500); + + randomSeek(dataLength, keyInputStream, inputData); + + // Read entire key. + validate(keyInputStream, inputData, 0, dataLength); + + // Repeat again and check. + randomSeek(dataLength, keyInputStream, inputData); + + validate(keyInputStream, inputData, 0, dataLength); + + keyInputStream.close(); + } + + /** + * This method does random seeks and reads and validates the reads are + * correct or not. + * @param dataLength + * @param keyInputStream + * @param inputData + * @throws Exception + */ + private void randomSeek(int dataLength, KeyInputStream keyInputStream, + byte[] inputData) throws Exception { + // Do random seek. + for (int i=0; i=100; i-=20) { + validate(keyInputStream, inputData, i, 20); + } + + // Start from begin and seek such that we read partially chunks. + for (int i=0; i