diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index c2e01487067b..f39ec8613307 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -113,7 +113,7 @@ public class OzoneClientConfig { type = ConfigType.SIZE, description = "Checksum will be computed for every bytes per checksum " + "number of bytes and stored sequentially. The minimum value for " - + "this config is 256KB.", + + "this config is 16KB.", tags = ConfigTag.CLIENT) private int bytesPerChecksum = 1024 * 1024; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index d088bb4a439a..7e6220209a22 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -502,4 +502,9 @@ private void handleReadError(IOException cause) throws IOException { refreshPipeline(cause); } + + @VisibleForTesting + public synchronized List getChunkStreams() { + return chunkStreams; + } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index 657269d37b97..dc9123dc9461 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -20,6 +20,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.client.BlockID; @@ -34,6 +37,7 @@ import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.OzoneChecksumException; +import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; @@ -42,8 +46,11 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.List; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An {@link InputStream} called from BlockInputStream to read a chunk from the @@ -53,6 +60,9 @@ public class ChunkInputStream extends InputStream implements Seekable, CanUnbuffer { + private static final Logger LOG = + LoggerFactory.getLogger(ChunkInputStream.class); + private ChunkInfo chunkInfo; private final long length; private final BlockID blockID; @@ -66,13 +76,21 @@ public class ChunkInputStream extends InputStream // Index of the buffers corresponding to the current position of the buffers private int bufferIndex; + // bufferOffsets[i] stores the index of the first data byte in buffer i + // (buffers.get(i)) w.r.t first byte in the buffers. + // Let's say each buffer has a capacity of 40 bytes. The bufferOffset for + // the first buffer would always be 0 as this would be the first data byte + // in buffers. BufferOffset for the 2nd buffer would be 40 as bytes 0-39 + // would be stored in buffer 0. Hence, bufferOffsets[0] = 0, + // bufferOffsets[1] = 40, bufferOffsets[2] = 80, etc. + private long[] bufferOffsets = null; // The offset of the current data residing in the buffers w.r.t the start // of chunk data - private long bufferOffset; + private long bufferOffsetWrtChunkData; // The number of bytes of chunk data residing in the buffers currently - private long bufferLength; + private long buffersSize; // Position of the ChunkInputStream is maintained by this variable (if a // seek is performed. This position is w.r.t to the chunk only and not the @@ -197,7 +215,7 @@ public synchronized void seek(long pos) throws IOException { if (buffersHavePosition(pos)) { // The bufferPosition is w.r.t the current chunk. // Adjust the bufferIndex and position to the seeked position. - adjustBufferPosition(pos - bufferOffset); + adjustBufferPosition(pos - bufferOffsetWrtChunkData); } else { chunkPosition = pos; } @@ -212,10 +230,13 @@ public synchronized long getPos() { return length; } if (buffersHaveData()) { - return bufferOffset + buffers.get(bufferIndex).position(); + // BufferOffset w.r.t to ChunkData + BufferOffset w.r.t buffers + + // Position of current Buffer + return bufferOffsetWrtChunkData + bufferOffsets[bufferIndex] + + buffers.get(bufferIndex).position(); } if (buffersAllocated()) { - return bufferOffset + bufferLength; + return bufferOffsetWrtChunkData + buffersSize; } return 0; } @@ -259,7 +280,7 @@ private synchronized int prepareRead(int len) throws IOException { if (buffersHavePosition(chunkPosition)) { // The current buffers have the seeked position. Adjust the buffer // index and position to point to the chunkPosition. - adjustBufferPosition(chunkPosition - bufferOffset); + adjustBufferPosition(chunkPosition - bufferOffsetWrtChunkData); } else { // Read a required chunk data to fill the buffers with seeked // position data @@ -270,7 +291,7 @@ private synchronized int prepareRead(int len) throws IOException { // Data is available from buffers ByteBuffer bb = buffers.get(bufferIndex); return len > bb.remaining() ? bb.remaining() : len; - } else if (dataRemainingInChunk()) { + } else if (dataRemainingInChunk()) { // There is more data in the chunk stream which has not // been read into the buffers yet. readChunkFromContainer(len); @@ -301,36 +322,38 @@ private synchronized void readChunkFromContainer(int len) throws IOException { startByteIndex = chunkPosition; } else { // Start reading the chunk from the last chunkPosition onwards. - startByteIndex = bufferOffset + bufferLength; + startByteIndex = bufferOffsetWrtChunkData + buffersSize; } - // bufferOffset and bufferLength are updated below, but if read fails + // bufferOffsetWrtChunkData and buffersSize are updated after the data + // is read from Container and put into the buffers, but if read fails // and is retried, we need the previous position. Position is reset after // successful read in adjustBufferPosition() storePosition(); + long adjustedBuffersOffset, adjustedBuffersLen; if (verifyChecksum) { - // Update the bufferOffset and bufferLength as per the checksum - // boundary requirement. - computeChecksumBoundaries(startByteIndex, len); + // Adjust the chunk offset and length to include required checksum + // boundaries + Pair adjustedOffsetAndLength = + computeChecksumBoundaries(startByteIndex, len); + adjustedBuffersOffset = adjustedOffsetAndLength.getLeft(); + adjustedBuffersLen = adjustedOffsetAndLength.getRight(); } else { // Read from the startByteIndex - bufferOffset = startByteIndex; - bufferLength = len; + adjustedBuffersOffset = startByteIndex; + adjustedBuffersLen = len; } // Adjust the chunkInfo so that only the required bytes are read from // the chunk. final ChunkInfo adjustedChunkInfo = ChunkInfo.newBuilder(chunkInfo) - .setOffset(bufferOffset + chunkInfo.getOffset()) - .setLen(bufferLength) + .setOffset(chunkInfo.getOffset() + adjustedBuffersOffset) + .setLen(adjustedBuffersLen) .build(); - ByteString byteString = readChunk(adjustedChunkInfo); - - buffers = byteString.asReadOnlyByteBufferList(); - bufferIndex = 0; - allocated = true; + readChunkDataIntoBuffers(adjustedChunkInfo); + bufferOffsetWrtChunkData = adjustedBuffersOffset; // If the stream was seeked to position before, then the buffer // position should be adjusted as the reads happen at checksum boundaries. @@ -339,14 +362,31 @@ private synchronized void readChunkFromContainer(int len) throws IOException { // 1. Stream was seeked to a position before the chunk was read // 2. Chunk was read from index < the current position to account for // checksum boundaries. - adjustBufferPosition(startByteIndex - bufferOffset); + adjustBufferPosition(startByteIndex - bufferOffsetWrtChunkData); + } + + private void readChunkDataIntoBuffers(ChunkInfo readChunkInfo) + throws IOException { + buffers = readChunk(readChunkInfo); + buffersSize = readChunkInfo.getLen(); + + bufferOffsets = new long[buffers.size()]; + int tempOffset = 0; + for (int i = 0; i < buffers.size(); i++) { + bufferOffsets[i] = tempOffset; + tempOffset += buffers.get(i).limit(); + } + + bufferIndex = 0; + allocated = true; } /** * Send RPC call to get the chunk from the container. */ @VisibleForTesting - protected ByteString readChunk(ChunkInfo readChunkInfo) throws IOException { + protected List readChunk(ChunkInfo readChunkInfo) + throws IOException { ReadChunkResponseProto readChunkResponse; try { @@ -364,7 +404,18 @@ protected ByteString readChunk(ChunkInfo readChunkInfo) throws IOException { throw new IOException("Unexpected OzoneException: " + e.toString(), e); } - return readChunkResponse.getData(); + if (readChunkResponse.hasData()) { + return readChunkResponse.getData().asReadOnlyByteBufferList(); + } else if (readChunkResponse.hasDataBuffers()) { + List buffersList = readChunkResponse.getDataBuffers() + .getBuffersList(); + return buffersList.stream() + .map(ByteString::asReadOnlyByteBuffer) + .collect(Collectors.toList()); + } else { + throw new IOException("Unexpected error while reading chunk data " + + "from container. No data returned."); + } } private CheckedBiFunction byteStrings; + boolean isV0 = false; + + if (readChunkResponse.hasData()) { + ByteString byteString = readChunkResponse.getData(); + if (byteString.size() != reqChunkInfo.getLen()) { + // Bytes read from chunk should be equal to chunk size. + throw new OzoneChecksumException(String.format( + "Inconsistent read for chunk=%s len=%d bytesRead=%d", + reqChunkInfo.getChunkName(), reqChunkInfo.getLen(), + byteString.size())); + } + byteStrings = new ArrayList<>(); + byteStrings.add(byteString); + isV0 = true; + } else { + byteStrings = readChunkResponse.getDataBuffers().getBuffersList(); + long buffersLen = BufferUtils.getBuffersLen(byteStrings); + if (buffersLen != reqChunkInfo.getLen()) { + // Bytes read from chunk should be equal to chunk size. + throw new OzoneChecksumException(String.format( + "Inconsistent read for chunk=%s len=%d bytesRead=%d", + reqChunkInfo.getChunkName(), reqChunkInfo.getLen(), + buffersLen)); + } } if (verifyChecksum) { @@ -396,7 +464,8 @@ protected ByteString readChunk(ChunkInfo readChunkInfo) throws IOException { chunkInfo.getOffset(); int bytesPerChecksum = checksumData.getBytesPerChecksum(); int startIndex = (int) (relativeOffset / bytesPerChecksum); - Checksum.verifyChecksum(byteString, checksumData, startIndex); + Checksum.verifyChecksum(byteStrings, checksumData, startIndex, + isV0); } }; @@ -413,18 +482,22 @@ protected ByteString readChunk(ChunkInfo readChunkInfo) throws IOException { * * @param startByteIndex the first byte index to be read by client * @param dataLen number of bytes to be read from the chunk + * @return Adjusted (Chunk Offset, Chunk Length) which needs to be read + * from Container */ - private void computeChecksumBoundaries(long startByteIndex, int dataLen) { + private Pair computeChecksumBoundaries(long startByteIndex, + int dataLen) { int bytesPerChecksum = chunkInfo.getChecksumData().getBytesPerChecksum(); // index of the last byte to be read from chunk, inclusively. final long endByteIndex = startByteIndex + dataLen - 1; - bufferOffset = (startByteIndex / bytesPerChecksum) + long adjustedChunkOffset = (startByteIndex / bytesPerChecksum) * bytesPerChecksum; // inclusive final long endIndex = ((endByteIndex / bytesPerChecksum) + 1) * bytesPerChecksum; // exclusive - bufferLength = Math.min(endIndex, length) - bufferOffset; + long adjustedChunkLen = Math.min(endIndex, length) - adjustedChunkOffset; + return Pair.of(adjustedChunkOffset, adjustedChunkLen); } /** @@ -433,18 +506,34 @@ private void computeChecksumBoundaries(long startByteIndex, int dataLen) { * @param bufferPosition the position to which the buffers must be advanced */ private void adjustBufferPosition(long bufferPosition) { - // The bufferPosition is w.r.t the current chunk. - // Adjust the bufferIndex and position to the seeked chunkPosition. - long tempOffest = 0; - for (int i = 0; i < buffers.size(); i++) { - if (bufferPosition - tempOffest >= buffers.get(i).capacity()) { - tempOffest += buffers.get(i).capacity(); - } else { - bufferIndex = i; - break; - } + // The bufferPosition is w.r.t the current buffers. + // Adjust the bufferIndex and position to the seeked bufferPosition. + if (bufferIndex >= buffers.size()) { + bufferIndex = Arrays.binarySearch(bufferOffsets, bufferPosition); + } else if (bufferPosition < bufferOffsets[bufferIndex]) { + bufferIndex = Arrays.binarySearch(bufferOffsets, 0, bufferIndex, + bufferPosition); + } else if (bufferPosition >= bufferOffsets[bufferIndex] + + buffers.get(bufferIndex).capacity()) { + bufferIndex = Arrays.binarySearch(bufferOffsets, bufferIndex + 1, + buffers.size(), bufferPosition); + } + if (bufferIndex < 0) { + bufferIndex = -bufferIndex - 2; + } + + buffers.get(bufferIndex).position( + (int) (bufferPosition - bufferOffsets[bufferIndex])); + + // Reset buffers > bufferIndex to position 0. We do this to reset any + // previous reads/ seeks which might have updated any buffer position. + // For buffers < bufferIndex, we do not need to reset the position as it + // not required for this read. If a seek was done to a position in the + // previous indices, the buffer position reset would be performed in the + // seek call. + for (int i = bufferIndex + 1; i < buffers.size(); i++) { + buffers.get(i).position(0); } - buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest)); // Reset the chunkPosition as chunk stream has been initialized i.e. the // buffers have been allocated. @@ -499,8 +588,8 @@ private boolean buffersHavePosition(long pos) { // Check if buffers have been allocated if (buffersAllocated()) { // Check if the current buffers cover the input position - return pos >= bufferOffset && - pos < bufferOffset + bufferLength; + return pos >= bufferOffsetWrtChunkData && + pos < bufferOffsetWrtChunkData + buffersSize; } return false; } @@ -514,7 +603,7 @@ private boolean dataRemainingInChunk() { if (chunkPosition >= 0) { bufferPos = chunkPosition; } else { - bufferPos = bufferOffset + bufferLength; + bufferPos = bufferOffsetWrtChunkData + buffersSize; } return bufferPos < length; @@ -532,8 +621,9 @@ private boolean chunkStreamEOF() { if (buffersHaveData() || dataRemainingInChunk()) { return false; } else { - Preconditions.checkState(bufferOffset + bufferLength == length, - "EOF detected, but not at the last byte of the chunk"); + Preconditions.checkState( + bufferOffsetWrtChunkData + buffersSize == length, + "EOF detected but not at the last byte of the chunk"); return true; } } @@ -544,9 +634,9 @@ private boolean chunkStreamEOF() { private void releaseBuffers() { buffers = null; bufferIndex = 0; - // We should not reset bufferOffset and bufferLength here because when - // getPos() is called in chunkStreamsEOF() we use these values and - // determine whether chunk is read completely or not. + // We should not reset bufferOffsetWrtChunkData and buffersSize here + // because when getPos() is called in chunkStreamEOF() we use these + // values and determine whether chunk is read completely or not. } /** @@ -579,4 +669,9 @@ public synchronized void unbuffer() { releaseBuffers(); releaseClient(); } + + @VisibleForTesting + public List getCachedBuffers() { + return buffers; + } } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java index 5f3210d76bca..8453210d9f9d 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdds.scm.storage; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -25,6 +26,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; /** @@ -48,12 +50,29 @@ public DummyChunkInputStream(ChunkInfo chunkInfo, } @Override - protected ByteString readChunk(ChunkInfo readChunkInfo) { - ByteString byteString = ByteString.copyFrom(chunkData, - (int) readChunkInfo.getOffset(), - (int) readChunkInfo.getLen()); - getReadByteBuffers().add(byteString); - return byteString; + protected List readChunk(ChunkInfo readChunkInfo) { + int offset = (int) readChunkInfo.getOffset(); + int remainingToRead = (int) readChunkInfo.getLen(); + + int bufferCapacity = readChunkInfo.getChecksumData().getBytesPerChecksum(); + int bufferLen; + readByteBuffers.clear(); + while (remainingToRead > 0) { + if (remainingToRead < bufferCapacity) { + bufferLen = remainingToRead; + } else { + bufferLen = bufferCapacity; + } + ByteString byteString = ByteString.copyFrom(chunkData, + offset, bufferLen); + + readByteBuffers.add(byteString); + + offset += bufferLen; + remainingToRead -= bufferLen; + } + + return BufferUtils.getReadOnlyByteBuffers(readByteBuffers); } @Override diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java index a9296bc9a1a8..f6d9b8d4e12a 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.storage; import java.io.EOFException; +import java.nio.ByteBuffer; +import java.util.List; import java.util.Random; import java.util.concurrent.atomic.AtomicReference; @@ -94,6 +96,19 @@ private void matchWithInputData(byte[] readData, int inputDataStartIndex, } } + private void matchWithInputData(List byteStrings, + int inputDataStartIndex, int length) { + int offset = inputDataStartIndex; + int totalBufferLen = 0; + for (ByteString byteString : byteStrings) { + int bufferLen = byteString.size(); + matchWithInputData(byteString.toByteArray(), offset, bufferLen); + offset += bufferLen; + totalBufferLen += bufferLen; + } + Assert.assertEquals(length, totalBufferLen); + } + /** * Seek to a position and verify through getPos(). */ @@ -123,10 +138,9 @@ public void testPartialChunkRead() throws Exception { // To read chunk data from index 0 to 49 (len = 50), we need to read // chunk from offset 0 to 60 as the checksum boundary is at every 20 // bytes. Verify that 60 bytes of chunk data are read and stored in the - // buffers. - matchWithInputData(chunkStream.getReadByteBuffers().get(0).toByteArray(), - 0, 60); - + // buffers. Since checksum boundary is at every 20 bytes, there should be + // 60/20 number of buffers. + matchWithInputData(chunkStream.getReadByteBuffers(), 0, 60); } @Test @@ -152,8 +166,7 @@ public void testSeek() throws Exception { byte[] b = new byte[30]; chunkStream.read(b, 0, 30); matchWithInputData(b, 25, 30); - matchWithInputData(chunkStream.getReadByteBuffers().get(0).toByteArray(), - 20, 40); + matchWithInputData(chunkStream.getReadByteBuffers(), 20, 40); // After read, the position of the chunkStream is evaluated from the // buffers and the chunkPosition should be reset to -1. @@ -216,8 +229,8 @@ public void connectsToNewPipeline() throws Exception { ChunkInputStream subject = new ChunkInputStream(chunkInfo, null, clientFactory, pipelineRef::get, false, null) { @Override - protected ByteString readChunk(ChunkInfo readChunkInfo) { - return ByteString.copyFrom(chunkData); + protected List readChunk(ChunkInfo readChunkInfo) { + return ByteString.copyFrom(chunkData).asReadOnlyByteBufferList(); } }; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index d2d11120da44..aecf45c62021 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -133,6 +133,11 @@ public final class ScmConfigKeys { // 4 MB by default public static final String OZONE_SCM_CHUNK_SIZE_DEFAULT = "4MB"; + public static final String OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY = + "ozone.chunk.read.buffer.default.size"; + public static final String OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_DEFAULT = + "64KB"; + public static final String OZONE_SCM_CHUNK_LAYOUT_KEY = "ozone.scm.chunk.layout"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java index 3d9ded991b25..3ff2c2ef8df7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java @@ -18,12 +18,17 @@ package org.apache.hadoop.hdds.scm.protocolPB; import com.google.common.base.Preconditions; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.Function; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto.Builder; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DataBuffers; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetCommittedBlockLengthResponseProto; @@ -34,8 +39,11 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; +import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import static org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion; + /** * A set of helper functions to create responses to container commands. */ @@ -204,26 +212,46 @@ public static ContainerCommandResponseProto getPutFileResponseSuccess( /** * Gets a response to the read small file call. - * @param msg - Msg - * @param data - Data + * @param request - Msg + * @param dataBuffers - Data * @param info - Info * @return Response. */ public static ContainerCommandResponseProto getGetSmallFileResponseSuccess( - ContainerCommandRequestProto msg, ByteString data, ChunkInfo info) { - - Preconditions.checkNotNull(msg); - - ReadChunkResponseProto.Builder readChunk = - ReadChunkResponseProto.newBuilder() - .setChunkData(info) - .setData((data)) - .setBlockID(msg.getGetSmallFile().getBlock().getBlockID()); + ContainerCommandRequestProto request, List dataBuffers, + ChunkInfo info) { + + Preconditions.checkNotNull(request); + + boolean isReadChunkV0 = getReadChunkVersion(request.getGetSmallFile()) + .equals(ContainerProtos.ReadChunkVersion.V0); + + ReadChunkResponseProto.Builder readChunk; + + if (isReadChunkV0) { + // V0 has all response data in a single ByteBuffer + ByteString combinedData = ByteString.EMPTY; + for (ByteString buffer : dataBuffers) { + combinedData.concat(buffer); + } + readChunk = ReadChunkResponseProto.newBuilder() + .setChunkData(info) + .setData(combinedData) + .setBlockID(request.getGetSmallFile().getBlock().getBlockID()); + } else { + // V1 splits response data into a list of ByteBuffers + readChunk = ReadChunkResponseProto.newBuilder() + .setChunkData(info) + .setDataBuffers(DataBuffers.newBuilder() + .addAllBuffers(dataBuffers) + .build()) + .setBlockID(request.getGetSmallFile().getBlock().getBlockID()); + } GetSmallFileResponseProto.Builder getSmallFile = GetSmallFileResponseProto.newBuilder().setData(readChunk); - return getSuccessResponseBuilder(msg) + return getSuccessResponseBuilder(request) .setCmdType(Type.GetSmallFile) .setGetSmallFile(getSmallFile) .build(); @@ -250,13 +278,29 @@ public static ContainerCommandResponseProto getReadContainerResponse( } public static ContainerCommandResponseProto getReadChunkResponse( - ContainerCommandRequestProto request, ByteString data) { - - ReadChunkResponseProto.Builder response = - ReadChunkResponseProto.newBuilder() - .setChunkData(request.getReadChunk().getChunkData()) - .setData(data) - .setBlockID(request.getReadChunk().getBlockID()); + ContainerCommandRequestProto request, ChunkBuffer data, + Function byteBufferToByteString) { + + boolean isReadChunkV0 = getReadChunkVersion(request.getReadChunk()) + .equals(ContainerProtos.ReadChunkVersion.V0); + + ReadChunkResponseProto.Builder response; + + if (isReadChunkV0) { + // V0 has all response data in a single ByteBuffer + response = ReadChunkResponseProto.newBuilder() + .setChunkData(request.getReadChunk().getChunkData()) + .setData(data.toByteString(byteBufferToByteString)) + .setBlockID(request.getReadChunk().getBlockID()); + } else { + // V1 splits response data into a list of ByteBuffers + response = ReadChunkResponseProto.newBuilder() + .setChunkData(request.getReadChunk().getChunkData()) + .setDataBuffers(DataBuffers.newBuilder() + .addAllBuffers(data.toByteStringList(byteBufferToByteString)) + .build()) + .setBlockID(request.getReadChunk().getBlockID()); + } return getSuccessResponseBuilder(request) .setReadChunk(response) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index f681d0dd2d9b..167ae8235264 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -226,7 +226,8 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto.newBuilder() .setBlockID(blockID.getDatanodeBlockIDProtobuf()) - .setChunkData(chunk); + .setChunkData(chunk) + .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1); String id = xceiverClient.getPipeline().getClosestNode().getUuidString(); ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk) @@ -489,6 +490,7 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, ContainerProtos.GetSmallFileRequestProto getSmallFileRequest = GetSmallFileRequestProto .newBuilder().setBlock(getBlock) + .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1) .build(); String id = client.getPipeline().getClosestNode().getUuidString(); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/utils/ClientCommandsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/utils/ClientCommandsUtils.java new file mode 100644 index 000000000000..d5d15396a3f9 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/utils/ClientCommandsUtils.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.utils; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; + +public final class ClientCommandsUtils { + + /** Utility classes should not be constructed. **/ + private ClientCommandsUtils() { + + } + + public static ContainerProtos.ReadChunkVersion getReadChunkVersion( + ContainerProtos.ReadChunkRequestProto readChunkRequest) { + if (readChunkRequest.hasReadChunkVersion()) { + return readChunkRequest.getReadChunkVersion(); + } else { + return ContainerProtos.ReadChunkVersion.V0; + } + } + + public static ContainerProtos.ReadChunkVersion getReadChunkVersion( + ContainerProtos.GetSmallFileRequestProto getSmallFileRequest) { + if (getSmallFileRequest.hasReadChunkVersion()) { + return getSmallFileRequest.getReadChunkVersion(); + } else { + return ContainerProtos.ReadChunkVersion.V0; + } + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/utils/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/utils/package-info.java new file mode 100644 index 000000000000..edd6b91d7b30 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/utils/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.utils; + +/** + * This package contains utility classes for the SCM and client protocols. + */ \ No newline at end of file diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 873d28839b48..e9843bb58ac5 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -333,7 +333,7 @@ public final class OzoneConfigKeys { "hdds.datanode.replication.work.dir"; - public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE = 256 * 1024; + public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE = 16 * 1024; public static final String OZONE_CLIENT_READ_TIMEOUT = "ozone.client.read.timeout"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java index db7a31eea950..76f84c46ab5e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java @@ -30,6 +30,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; +import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -144,6 +145,13 @@ public ChecksumData computeChecksum(ByteBuffer data) return computeChecksum(ChunkBuffer.wrap(data)); } + public ChecksumData computeChecksum(List byteStrings) + throws OzoneChecksumException { + final List buffers = + BufferUtils.getReadOnlyByteBuffers(byteStrings); + return computeChecksum(ChunkBuffer.wrap(buffers)); + } + public ChecksumData computeChecksum(ChunkBuffer data) throws OzoneChecksumException { if (checksumType == ChecksumType.NONE) { @@ -241,6 +249,46 @@ private static boolean verifyChecksum(ByteBuffer data, return checksumData.verifyChecksumDataMatches(computed, startIndex); } + /** + * Computes the ChecksumData for the input byteStrings and verifies that + * the checksums match with that of the input checksumData. + * @param byteStrings input data buffers list. Each byteString should + * correspond to one checksum. + * @param checksumData checksumData to match with + * @param startIndex index of first checksum in checksumData to match with + * data's computed checksum. + * @param isSingleByteString if true, there is only one byteString in the + * input list and it should be processes + * accordingly + * @throws OzoneChecksumException is thrown if checksums do not match + */ + public static boolean verifyChecksum(List byteStrings, + ChecksumData checksumData, int startIndex, boolean isSingleByteString) + throws OzoneChecksumException { + ChecksumType checksumType = checksumData.getChecksumType(); + if (checksumType == ChecksumType.NONE) { + // Checksum is set to NONE. No further verification is required. + return true; + } + + if (isSingleByteString) { + // The data is a single ByteString (old format). + return verifyChecksum(byteStrings.get(0), checksumData, startIndex); + } + + // The data is a list of ByteStrings. Each ByteString length should be + // the same as the number of bytes per checksum (except the last + // ByteString which could be smaller). + final List buffers = + BufferUtils.getReadOnlyByteBuffers(byteStrings); + + int bytesPerChecksum = checksumData.getBytesPerChecksum(); + Checksum checksum = new Checksum(checksumType, bytesPerChecksum); + final ChecksumData computed = checksum.computeChecksum( + ChunkBuffer.wrap(buffers)); + return checksumData.verifyChecksumDataMatches(computed, startIndex); + } + /** * Returns a ChecksumData with type NONE for testing. */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java index 65f8a895a4b6..7d069cddc63e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java @@ -144,6 +144,17 @@ default ByteString toByteString(Function function) { return toByteStringImpl(b -> applyAndAssertFunction(b, function, this)); } + /** + * Convert this buffer(s) to a list of {@link ByteString}. + * The position and limit of this {@link ChunkBuffer} remains unchanged. + * The given function must preserve the position and limit + * of the input {@link ByteBuffer}. + */ + default List toByteStringList( + Function function) { + return toByteStringListImpl(b -> applyAndAssertFunction(b, function, this)); + } + // for testing default ByteString toByteString() { return toByteString(ByteStringConversion::safeWrap); @@ -151,6 +162,9 @@ default ByteString toByteString() { ByteString toByteStringImpl(Function function); + List toByteStringListImpl( + Function function); + static void assertInt(int expected, int computed, Supplier prefix) { if (expected != computed) { throw new IllegalStateException(prefix.get() diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java index dd8163504bf5..afa39f8d1975 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -121,6 +122,12 @@ public ByteString toByteStringImpl(Function f) { return f.apply(buffer); } + @Override + public List toByteStringListImpl( + Function f) { + return Arrays.asList(f.apply(buffer)); + } + @Override public boolean equals(Object obj) { if (this == obj) { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java index e9949ccae259..94ad552df72e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java @@ -19,6 +19,9 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import java.io.IOException; @@ -109,6 +112,11 @@ public int limit() { return limit; } + @Override + public boolean hasRemaining() { + return position() < limit; + } + @Override public ChunkBuffer rewind() { buffers.forEach(ByteBuffer::rewind); @@ -171,9 +179,29 @@ public ChunkBuffer duplicate(int newPosition, int newLimit) { } @Override + /** + * Returns the next buffer in the list irrespective of the bufferSize. + */ public Iterable iterate(int bufferSize) { - // currently not necessary; implement if needed - throw new UnsupportedOperationException(); + return () -> new Iterator() { + @Override + public boolean hasNext() { + return hasRemaining(); + } + + @Override + public ByteBuffer next() { + if (!hasRemaining()) { + throw new NoSuchElementException(); + } + findCurrent(); + ByteBuffer current = buffers.get(currentIndex); + final ByteBuffer duplicated = current.duplicate(); + duplicated.limit(current.limit()); + current.position(current.limit()); + return duplicated; + } + }; } @Override @@ -193,6 +221,12 @@ public ByteString toByteStringImpl(Function f) { return buffers.stream().map(f).reduce(ByteString.EMPTY, ByteString::concat); } + @Override + public List toByteStringListImpl( + Function f) { + return buffers.stream().map(f).collect(Collectors.toList()); + } + @Override public String toString() { return getClass().getSimpleName() diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java index e63ab973555b..ce7b1ebd92b0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.common; import com.google.common.base.Preconditions; +import java.util.stream.Collectors; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import java.io.IOException; @@ -273,6 +274,12 @@ public ByteString toByteStringImpl(Function f) { return buffers.stream().map(f).reduce(ByteString.EMPTY, ByteString::concat); } + @Override + public List toByteStringListImpl( + Function f) { + return buffers.stream().map(f).collect(Collectors.toList()); + } + @Override public boolean equals(Object obj) { if (this == obj) { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java new file mode 100644 index 000000000000..a8be69dec278 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.common.utils; + +import com.google.common.base.Preconditions; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; + +public final class BufferUtils { + + /** Utility classes should not be constructed. **/ + private BufferUtils() { + + } + + /** + * Assign an array of ByteBuffers. + * @param totalLen total length of all ByteBuffers + * @param bufferCapacity max capacity of each ByteBuffer + */ + public static ByteBuffer[] assignByteBuffers(long totalLen, + long bufferCapacity) { + Preconditions.checkArgument(totalLen > 0, "Buffer Length should be a " + + "positive integer."); + Preconditions.checkArgument(bufferCapacity > 0, "Buffer Capacity should " + + "be a positive integer."); + + int numBuffers = getNumberOfBins(totalLen, bufferCapacity); + + ByteBuffer[] dataBuffers = new ByteBuffer[numBuffers]; + int buffersAllocated = 0; + // For each ByteBuffer (except the last) allocate bufferLen of capacity + for (int i = 0; i < numBuffers - 1; i++) { + dataBuffers[i] = ByteBuffer.allocate((int) bufferCapacity); + buffersAllocated += bufferCapacity; + } + // For the last ByteBuffer, allocate as much space as is needed to fit + // remaining bytes + dataBuffers[numBuffers - 1] = ByteBuffer.allocate( + (int) (totalLen - buffersAllocated)); + return dataBuffers; + } + + /** + * Return a read only ByteBuffer list for the input ByteStrings list. + */ + public static List getReadOnlyByteBuffers( + List byteStrings) { + List buffers = new ArrayList<>(); + for (ByteString byteString : byteStrings) { + buffers.add(byteString.asReadOnlyByteBuffer()); + } + return buffers; + } + + public static ByteString concatByteStrings(List byteStrings) { + return byteStrings.stream().reduce(ByteString::concat).orElse(null); + } + + /** + * Return the summation of the length of all ByteStrings. + */ + public static long getBuffersLen(List buffers) { + long length = 0; + for (ByteString buffer : buffers) { + length += buffer.size(); + } + return length; + } + + /** + * Return the number of bins required to hold all the elements given a max + * capacity for each bin. + * @param numElements total number of elements to put in bin + * @param maxElementsPerBin max number of elements per bin + * @return number of bins + */ + public static int getNumberOfBins(long numElements, long maxElementsPerBin) { + return (int) Math.ceil((double) numElements / (double) maxElementsPerBin); + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/package-info.java new file mode 100644 index 000000000000..11741fae1ae6 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.common.utils; + +/** + * This package contains common utility classes for HDDS. + */ \ No newline at end of file diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java index 1c73a316e5db..bc97e8e4b0db 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java @@ -38,6 +38,10 @@ public class ChunkInfo { private ChecksumData checksumData; private final Map metadata; + // For older clients reading chunks in V0 version (all read data should + // reside in one buffer). This variable should be set to true for older + // clients to maintain backward wire compatibility. + private boolean readDataIntoSingleBuffer = false; /** * Constructs a ChunkInfo. @@ -182,4 +186,12 @@ public String toString() { ", len=" + len + '}'; } + + public void setReadDataIntoSingleBuffer(boolean readDataIntoSingleBuffer) { + this.readDataIntoSingleBuffer = readDataIntoSingleBuffer; + } + + public boolean isReadDataIntoSingleBuffer() { + return readDataIntoSingleBuffer; + } } diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 051868e1daed..ae81c843bffa 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -700,6 +700,19 @@ hdds.ratis.raft.grpc.flow.control.window for more information. + + ozone.chunk.read.buffer.default.size + 64KB + OZONE, SCM, CONTAINER, PERFORMANCE + + The default read buffer size during read chunk operations when checksum + is disabled. Chunk data will be cached in buffers of this capacity. + + For chunk data with checksum, the read buffer size will be the + same as the number of bytes per checksum + (ozone.client.bytes.per.checksum) corresponding to the chunk. + + ozone.scm.chunk.layout FILE_PER_BLOCK diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java index b53fe7e337b9..9f2faffbea5b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java @@ -281,10 +281,22 @@ public static ContainerCommandResponseProto processForDebug( if (msg.hasReadChunk() || msg.hasGetSmallFile()) { ContainerCommandResponseProto.Builder builder = msg.toBuilder(); if (msg.hasReadChunk()) { - builder.getReadChunkBuilder().setData(REDACTED); + if (msg.getReadChunk().hasData()) { + builder.getReadChunkBuilder().setData(REDACTED); + } + if (msg.getReadChunk().hasDataBuffers()) { + builder.getReadChunkBuilder().getDataBuffersBuilder() + .addBuffers(REDACTED); + } } if (msg.hasGetSmallFile()) { - builder.getGetSmallFileBuilder().getDataBuilder().setData(REDACTED); + if (msg.getGetSmallFile().getData().hasData()) { + builder.getGetSmallFileBuilder().getDataBuilder().setData(REDACTED); + } + if (msg.getGetSmallFile().getData().hasDataBuffers()) { + builder.getGetSmallFileBuilder().getDataBuilder() + .getDataBuffersBuilder().addBuffers(REDACTED); + } } return builder.build(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index de2a058c9b61..424c08a14d0e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hdds.utils.Cache; import org.apache.hadoop.hdds.utils.ResourceLimitCache; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.util.Time; @@ -572,7 +573,8 @@ private ByteString readStateMachineData( ReadChunkRequestProto.Builder readChunkRequestProto = ReadChunkRequestProto.newBuilder() .setBlockID(writeChunkRequestProto.getBlockID()) - .setChunkData(chunkInfo); + .setChunkData(chunkInfo) + .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1); ContainerCommandRequestProto dataContainerCommandProto = ContainerCommandRequestProto.newBuilder(requestProto) .setCmdType(Type.ReadChunk).setReadChunk(readChunkRequestProto) @@ -595,14 +597,22 @@ private ByteString readStateMachineData( } ReadChunkResponseProto responseProto = response.getReadChunk(); + ByteString data; + if (responseProto.hasData()) { + data = responseProto.getData(); + } else { + data = BufferUtils.concatByteStrings( + responseProto.getDataBuffers().getBuffersList()); + } - ByteString data = responseProto.getData(); // assert that the response has data in it. Preconditions - .checkNotNull(data, "read chunk data is null for chunk: %s", chunkInfo); + .checkNotNull(data, "read chunk data is null for chunk: %s", + chunkInfo); Preconditions.checkState(data.size() == chunkInfo.getLen(), "read chunk len=%s does not match chunk expected len=%s for chunk:%s", data.size(), chunkInfo.getLen(), chunkInfo); + return data; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 117e97384027..3b672bf3f87a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -49,6 +50,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.ChunkBuffer; +import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; @@ -92,6 +94,8 @@ import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.putBlockResponseSuccess; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest; +import static org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion; + import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -601,8 +605,18 @@ ContainerCommandResponseProto handleReadChunk( dispatcherContext = new DispatcherContext.Builder().build(); } - data = chunkManager - .readChunk(kvContainer, blockID, chunkInfo, dispatcherContext); + boolean isReadChunkV0 = getReadChunkVersion(request.getReadChunk()) + .equals(ContainerProtos.ReadChunkVersion.V0); + if (isReadChunkV0) { + // For older clients, set ReadDataIntoSingleBuffer to true so that + // all the data read from chunk file is returned as a single + // ByteString. Older clients cannot process data returned as a list + // of ByteStrings. + chunkInfo.setReadDataIntoSingleBuffer(true); + } + + data = chunkManager.readChunk(kvContainer, blockID, chunkInfo, + dispatcherContext); metrics.incContainerBytesStats(Type.ReadChunk, chunkInfo.getLen()); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); @@ -614,8 +628,7 @@ ContainerCommandResponseProto handleReadChunk( Preconditions.checkNotNull(data, "Chunk data is null"); - ByteString byteString = data.toByteString(byteBufferToByteString); - return getReadChunkResponse(request, byteString); + return getReadChunkResponse(request, data, byteBufferToByteString); } /** @@ -833,21 +846,32 @@ ContainerCommandResponseProto handleGetSmallFile( .getBlockID()); BlockData responseData = blockManager.getBlock(kvContainer, blockID); - ContainerProtos.ChunkInfo chunkInfo = null; - ByteString dataBuf = ByteString.EMPTY; + ContainerProtos.ChunkInfo chunkInfoProto = null; + List dataBuffers = new ArrayList<>(); DispatcherContext dispatcherContext = new DispatcherContext.Builder().build(); for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) { // if the block is committed, all chunks must have been committed. // Tmp chunk files won't exist here. + ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunk); + boolean isReadChunkV0 = getReadChunkVersion(request.getGetSmallFile()) + .equals(ContainerProtos.ReadChunkVersion.V0); + if (isReadChunkV0) { + // For older clients, set ReadDataIntoSingleBuffer to true so that + // all the data read from chunk file is returned as a single + // ByteString. Older clients cannot process data returned as a list + // of ByteStrings. + chunkInfo.setReadDataIntoSingleBuffer(true); + } ChunkBuffer data = chunkManager.readChunk(kvContainer, blockID, - ChunkInfo.getFromProtoBuf(chunk), dispatcherContext); - ByteString current = data.toByteString(byteBufferToByteString); - dataBuf = dataBuf.concat(current); - chunkInfo = chunk; + chunkInfo, dispatcherContext); + dataBuffers.addAll(data.toByteStringList(byteBufferToByteString)); + chunkInfoProto = chunk; } - metrics.incContainerBytesStats(Type.GetSmallFile, dataBuf.size()); - return getGetSmallFileResponseSuccess(request, dataBuf, chunkInfo); + metrics.incContainerBytesStats(Type.GetSmallFile, + BufferUtils.getBuffersLen(dataBuffers)); + return getGetSmallFileResponseSuccess(request, dataBuffers, + chunkInfoProto); } catch (StorageContainerException e) { return ContainerUtils.logAndReturnError(LOG, e, request); } catch (IOException ex) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java index e1fac6fcb48b..f0fbee5b8d79 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java @@ -167,11 +167,11 @@ private static long writeDataToChannel(FileChannel channel, ChunkBuffer data, } /** - * Reads data from an existing chunk file. + * Reads data from an existing chunk file into a list of ByteBuffers. * * @param file file where data lives */ - public static void readData(File file, ByteBuffer buf, + public static void readData(File file, ByteBuffer[] buffers, long offset, long len, VolumeIOStats volumeIOStats) throws StorageContainerException { @@ -184,7 +184,7 @@ public static void readData(File file, ByteBuffer buf, try (FileChannel channel = open(path, READ_OPTIONS, NO_ATTRIBUTES); FileLock ignored = channel.lock(offset, len, true)) { - return channel.read(buf, offset); + return channel.position(offset).read(buffers); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -204,7 +204,9 @@ public static void readData(File file, ByteBuffer buf, validateReadSize(len, bytesRead); - buf.flip(); + for (ByteBuffer buf : buffers) { + buf.flip(); + } } /** @@ -347,5 +349,4 @@ private static ContainerProtos.Result translate(Exception cause) { return CONTAINER_INTERNAL_ERROR; } - } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index 17c37c96ca68..fadd35975a3a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; import org.apache.hadoop.hdds.utils.db.BatchOperation; @@ -59,6 +61,9 @@ public class BlockManagerImpl implements BlockManager { private static final String NO_SUCH_BLOCK_ERR_MSG = "Unable to find the block."; + // Default Read Buffer capacity when Checksum is not present + private final long defaultReadBufferCapacity; + /** * Constructs a Block Manager. * @@ -67,6 +72,10 @@ public class BlockManagerImpl implements BlockManager { public BlockManagerImpl(ConfigurationSource conf) { Preconditions.checkNotNull(conf, "Config cannot be null"); this.config = conf; + this.defaultReadBufferCapacity = (long) config.getStorageSize( + ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY, + ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_DEFAULT, + StorageUnit.BYTES); } /** @@ -244,6 +253,11 @@ public long getCommittedBlockLength(Container container, BlockID blockID) } } + @Override + public long getDefaultReadBufferCapacity() { + return defaultReadBufferCapacity; + } + /** * Deletes an existing block. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java index 1c4a917d047c..26113a6611d3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java @@ -58,7 +58,7 @@ public class ChunkManagerDispatcher implements ChunkManager { ChunkManagerDispatcher(boolean sync, BlockManager manager) { handlers.put(FILE_PER_CHUNK, new FilePerChunkStrategy(sync, manager)); - handlers.put(FILE_PER_BLOCK, new FilePerBlockStrategy(sync)); + handlers.put(FILE_PER_BLOCK, new FilePerBlockStrategy(sync, manager)); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java index 7e7e5f45eb06..925ef71d115f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java @@ -22,11 +22,15 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; +import com.google.common.collect.Lists; + import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.ChunkBuffer; +import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; @@ -35,6 +39,7 @@ import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats; import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; +import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.common.interfaces.Container; @@ -67,9 +72,12 @@ public class FilePerBlockStrategy implements ChunkManager { private final boolean doSyncWrite; private final OpenFiles files = new OpenFiles(); + private final long defaultReadBufferCapacity; - public FilePerBlockStrategy(boolean sync) { + public FilePerBlockStrategy(boolean sync, BlockManager manager) { doSyncWrite = sync; + this.defaultReadBufferCapacity = manager == null ? 0 : + manager.getDefaultReadBufferCapacity(); } private static void checkLayoutVersion(Container container) { @@ -145,10 +153,37 @@ public ChunkBuffer readChunk(Container container, BlockID blockID, long len = info.getLen(); long offset = info.getOffset(); - ByteBuffer data = ByteBuffer.allocate((int) len); - ChunkUtils.readData(chunkFile, data, offset, len, volumeIOStats); - return ChunkBuffer.wrap(data); + long bufferCapacity = 0; + if (info.isReadDataIntoSingleBuffer()) { + // Older client - read all chunk data into one single buffer. + bufferCapacity = len; + } else { + // Set buffer capacity to checksum boundary size so that each buffer + // corresponds to one checksum. If checksum is NONE, then set buffer + // capacity to default (OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY = 64KB). + ChecksumData checksumData = info.getChecksumData(); + + if (checksumData != null) { + if (checksumData.getChecksumType() == + ContainerProtos.ChecksumType.NONE) { + bufferCapacity = defaultReadBufferCapacity; + } else { + bufferCapacity = checksumData.getBytesPerChecksum(); + } + } + } + // If the buffer capacity is 0, set all the data into one ByteBuffer + if (bufferCapacity == 0) { + bufferCapacity = len; + } + + ByteBuffer[] dataBuffers = BufferUtils.assignByteBuffers(len, + bufferCapacity); + + ChunkUtils.readData(chunkFile, dataBuffers, offset, len, volumeIOStats); + + return ChunkBuffer.wrap(Lists.newArrayList(dataBuffers)); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java index 159b0b2c731e..d7f994ee7c94 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java @@ -20,12 +20,15 @@ import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.ChunkBuffer; +import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; @@ -65,10 +68,13 @@ public class FilePerChunkStrategy implements ChunkManager { private final boolean doSyncWrite; private final BlockManager blockManager; + private final long defaultReadBufferCapacity; public FilePerChunkStrategy(boolean sync, BlockManager manager) { doSyncWrite = sync; blockManager = manager; + this.defaultReadBufferCapacity = manager == null ? 0 : + manager.getDefaultReadBufferCapacity(); } private static void checkLayoutVersion(Container container) { @@ -222,7 +228,33 @@ public ChunkBuffer readChunk(Container container, BlockID blockID, } long len = info.getLen(); - ByteBuffer data = ByteBuffer.allocate((int) len); + + long bufferCapacity = 0; + if (info.isReadDataIntoSingleBuffer()) { + // Older client - read all chunk data into one single buffer. + bufferCapacity = len; + } else { + // Set buffer capacity to checksum boundary size so that each buffer + // corresponds to one checksum. If checksum is NONE, then set buffer + // capacity to default (OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY = 64KB). + ChecksumData checksumData = info.getChecksumData(); + + if (checksumData != null) { + if (checksumData.getChecksumType() == + ContainerProtos.ChecksumType.NONE) { + bufferCapacity = defaultReadBufferCapacity; + } else { + bufferCapacity = checksumData.getBytesPerChecksum(); + } + } + } + // If the buffer capacity is 0, set all the data into one ByteBuffer + if (bufferCapacity == 0) { + bufferCapacity = len; + } + + ByteBuffer[] dataBuffers = BufferUtils.assignByteBuffers(len, + bufferCapacity); long chunkFileOffset = 0; if (info.getOffset() != 0) { @@ -255,8 +287,8 @@ public ChunkBuffer readChunk(Container container, BlockID blockID, if (file.exists()) { long offset = info.getOffset() - chunkFileOffset; Preconditions.checkState(offset >= 0); - ChunkUtils.readData(file, data, offset, len, volumeIOStats); - return ChunkBuffer.wrap(data); + ChunkUtils.readData(file, dataBuffers, offset, len, volumeIOStats); + return ChunkBuffer.wrap(Lists.newArrayList(dataBuffers)); } } catch (StorageContainerException ex) { //UNABLE TO FIND chunk is not a problem as we will try with the @@ -264,7 +296,7 @@ public ChunkBuffer readChunk(Container container, BlockID blockID, if (ex.getResult() != UNABLE_TO_FIND_CHUNK) { throw ex; } - data.clear(); + dataBuffers = null; } } throw new StorageContainerException( diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java index 72b104025b59..cab1a3ab7e2e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java @@ -90,6 +90,8 @@ List listBlock(Container container, long startLocalID, int count) long getCommittedBlockLength(Container container, BlockID blockID) throws IOException; + long getDefaultReadBufferCapacity(); + /** * Shutdown ContainerManager. */ diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index e430a3018118..a035379bc905 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -245,6 +245,7 @@ public static ContainerCommandRequestProto getReadChunkRequest( ContainerProtos.ReadChunkRequestProto.newBuilder(); readRequest.setBlockID(request.getBlockID()); readRequest.setChunkData(request.getChunkData()); + readRequest.setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1); Builder newRequest = ContainerCommandRequestProto.newBuilder(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java index 96d4228025ca..d0b3113aa3e5 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java @@ -185,7 +185,7 @@ private void createToDeleteBlocks(ContainerSet containerSet, int numOfChunksPerBlock) throws IOException { ChunkManager chunkManager; if (layout == FILE_PER_BLOCK) { - chunkManager = new FilePerBlockStrategy(true); + chunkManager = new FilePerBlockStrategy(true, null); } else { chunkManager = new FilePerChunkStrategy(true, null); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java index e6998fefbd1a..bdc5a42ac7af 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java @@ -38,6 +38,7 @@ .StorageContainerDatanodeProtocolProtos.ContainerAction; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Handler; @@ -178,8 +179,10 @@ public void testCreateContainerWithWriteChunk() throws IOException { response = hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest), null); Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); - Assert.assertEquals(response.getReadChunk().getData(), - writeChunkRequest.getWriteChunk().getData()); + ByteString responseData = BufferUtils.concatByteStrings( + response.getReadChunk().getDataBuffers().getBuffersList()); + Assert.assertEquals(writeChunkRequest.getWriteChunk().getData(), + responseData); } finally { ContainerMetrics.remove(); FileUtils.deleteDirectory(new File(testDir)); @@ -358,7 +361,8 @@ private ContainerCommandRequestProto getReadChunkRequest( ContainerProtos.ReadChunkRequestProto.Builder readChunkRequest = ContainerProtos.ReadChunkRequestProto.newBuilder() .setBlockID(writeChunk.getBlockID()) - .setChunkData(writeChunk.getChunkData()); + .setChunkData(writeChunk.getChunkData()) + .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1); return ContainerCommandRequestProto.newBuilder() .setCmdType(ContainerProtos.Type.ReadChunk) .setContainerID(writeChunk.getBlockID().getContainerID()) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/ChunkLayoutTestInfo.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/ChunkLayoutTestInfo.java index 55f1f60b9026..ce9c2aa59f58 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/ChunkLayoutTestInfo.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/ChunkLayoutTestInfo.java @@ -81,7 +81,7 @@ public ChunkLayOutVersion getLayout() { FILE_PER_BLOCK { @Override public ChunkManager createChunkManager(boolean sync, BlockManager manager) { - return new FilePerBlockStrategy(sync); + return new FilePerBlockStrategy(sync, null); } @Override diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java index 7267314e62e1..3d9f03c9979e 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java @@ -202,7 +202,8 @@ private ContainerCommandRequestProto getDummyCommandRequestProto( break; case ReadChunk: builder.setReadChunk(ContainerProtos.ReadChunkRequestProto.newBuilder() - .setBlockID(fakeBlockId).setChunkData(fakeChunkInfo).build()); + .setBlockID(fakeBlockId).setChunkData(fakeChunkInfo) + .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1).build()); break; case DeleteChunk: builder diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java index 5b64a378bca3..050946371c51 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.common.ChunkBuffer; +import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats; import org.apache.hadoop.test.GenericTestUtils; @@ -83,8 +84,13 @@ public void concurrentReadOfSameFile() throws Exception { final int threadNumber = i; executor.execute(() -> { try { - ByteBuffer readBuffer = ByteBuffer.allocate((int) len); - ChunkUtils.readData(file, readBuffer, offset, len, stats); + ByteBuffer[] readBuffers = BufferUtils.assignByteBuffers(len, len); + ChunkUtils.readData(file, readBuffers, offset, len, stats); + + // There should be only one element in readBuffers + Assert.assertEquals(1, readBuffers.length); + ByteBuffer readBuffer = readBuffers[0]; + LOG.info("Read data ({}): {}", threadNumber, new String(readBuffer.array(), UTF_8)); if (!Arrays.equals(array, readBuffer.array())) { @@ -161,8 +167,14 @@ public void serialRead() throws Exception { long len = data.limit(); long offset = 0; ChunkUtils.writeData(file, data, offset, len, stats, true); - ByteBuffer readBuffer = ByteBuffer.allocate((int) len); - ChunkUtils.readData(file, readBuffer, offset, len, stats); + + ByteBuffer[] readBuffers = BufferUtils.assignByteBuffers(len, len); + ChunkUtils.readData(file, readBuffers, offset, len, stats); + + // There should be only one element in readBuffers + Assert.assertEquals(1, readBuffers.length); + ByteBuffer readBuffer = readBuffers[0]; + assertArrayEquals(array, readBuffer.array()); assertEquals(len, readBuffer.remaining()); } catch (Exception e) { @@ -193,13 +205,13 @@ public void readMissingFile() throws Exception { int len = 123; int offset = 0; File nonExistentFile = new File("nosuchfile"); - ByteBuffer buf = ByteBuffer.allocate(len); + ByteBuffer[] bufs = BufferUtils.assignByteBuffers(len, len); VolumeIOStats stats = new VolumeIOStats(); // when StorageContainerException e = LambdaTestUtils.intercept( StorageContainerException.class, - () -> ChunkUtils.readData(nonExistentFile, buf, offset, len, stats)); + () -> ChunkUtils.readData(nonExistentFile, bufs, offset, len, stats)); // then Assert.assertEquals(UNABLE_TO_FIND_CHUNK, e.getResult()); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java index 8ccf9ee6a8c3..f3be6e2fb3bd 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java @@ -125,12 +125,17 @@ public void testPartialRead() throws Exception { subject.writeChunk(container, blockID, info, data, ctx); ChunkBuffer readData = subject.readChunk(container, blockID, info, ctx); - assertEquals(data.rewind(), readData.rewind()); + // data will be ChunkBufferImplWithByteBuffer and readData will return + // ChunkBufferImplWithByteBufferList. Hence, convert both ByteStrings + // before comparing. + assertEquals(data.rewind().toByteString(), + readData.rewind().toByteString()); ChunkInfo info2 = getChunk(blockID.getLocalID(), 0, start, length); ChunkBuffer readData2 = subject.readChunk(container, blockID, info2, ctx); assertEquals(length, info2.getLen()); - assertEquals(data.duplicate(start, start + length), readData2.rewind()); + assertEquals(data.rewind().toByteString().substring(start, start + length), + readData2.rewind().toByteString()); } @Override diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index 01cc4fff0a29..31947db18adc 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -398,15 +398,29 @@ message WriteChunkRequestProto { message WriteChunkResponseProto { } +enum ReadChunkVersion { + V0 = 0; // Response data is sent in a single ByteBuffer + V1 = 1; // Response data is split into multiple buffers +} + message ReadChunkRequestProto { required DatanodeBlockID blockID = 1; required ChunkInfo chunkData = 2; + optional ReadChunkVersion readChunkVersion = 3; } message ReadChunkResponseProto { required DatanodeBlockID blockID = 1; required ChunkInfo chunkData = 2; - required bytes data = 3; + // Chunk data should be returned in one of the two for + oneof responseData { + bytes data = 3; // Chunk data is returned as single buffer for V0 + DataBuffers dataBuffers = 4; // Chunk data is returned as a list of buffers + } +} + +message DataBuffers { + repeated bytes buffers = 1; } message DeleteChunkRequestProto { @@ -443,6 +457,7 @@ message PutSmallFileResponseProto { message GetSmallFileRequestProto { required GetBlockRequestProto block = 1; + optional ReadChunkVersion readChunkVersion = 2; } message GetSmallFileResponseProto { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index c8bf7db7fe5d..10d63d0682a0 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -385,4 +385,9 @@ public void unbuffer() { is.unbuffer(); } } + + @VisibleForTesting + public List getBlockStreams() { + return blockStreams; + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java new file mode 100644 index 000000000000..2ad91a7c2522 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc.read; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.hadoop.hdds.scm.storage.BlockInputStream; +import org.apache.hadoop.hdds.scm.storage.ChunkInputStream; +import org.apache.hadoop.ozone.client.io.KeyInputStream; +import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests {@link ChunkInputStream}. + */ +public class TestChunkInputStream extends TestInputStreamBase { + + public TestChunkInputStream(ChunkLayOutVersion layout) { + super(layout); + } + + /** + * Test to verify that data read from chunks is stored in a list of buffers + * with max capacity equal to the bytes per checksum. + */ + @Test + public void testChunkReadBuffers() throws Exception { + String keyName = getNewKeyName(); + int dataLength = (2 * BLOCK_SIZE) + (CHUNK_SIZE); + byte[] inputData = writeRandomBytes(keyName, dataLength); + + KeyInputStream keyInputStream = getKeyInputStream(keyName); + + BlockInputStream block0Stream = keyInputStream.getBlockStreams().get(0); + block0Stream.initialize(); + + ChunkInputStream chunk0Stream = block0Stream.getChunkStreams().get(0); + + // To read 1 byte of chunk data, ChunkInputStream should get one full + // checksum boundary worth of data from Container and store it in buffers. + chunk0Stream.read(new byte[1]); + checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, + BYTES_PER_CHECKSUM); + + // Read > checksum boundary of data from chunk0 + int readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2); + byte[] readData = readDataFromChunk(chunk0Stream, readDataLen, 0); + validateData(inputData, 0, readData); + + // The first checksum boundary size of data was already existing in the + // ChunkStream buffers. Once that data is read, the next checksum + // boundary size of data will be fetched again to read the remaining data. + // Hence there should be 1 checksum boundary size of data stored in the + // ChunkStreams buffers at the end of the read. + checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, + BYTES_PER_CHECKSUM); + + // Seek to a position in the third checksum boundary (so that current + // buffers do not have the seeked position) and read > BYTES_PER_CHECKSUM + // bytes of data. This should result in 2 * BYTES_PER_CHECKSUM amount of + // data being read into the buffers. There should be 2 buffers each with + // BYTES_PER_CHECKSUM capacity. + readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2); + int offset = 2 * BYTES_PER_CHECKSUM + 1; + readData = readDataFromChunk(chunk0Stream, readDataLen, offset); + validateData(inputData, offset, readData); + checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 2, + BYTES_PER_CHECKSUM); + + + // Read the full chunk data -1 and verify that all chunk data is read into + // buffers. We read CHUNK_SIZE - 1 as otherwise the buffers will be + // released once all chunk data is read. + readData = readDataFromChunk(chunk0Stream, CHUNK_SIZE - 1, 0); + validateData(inputData, 0, readData); + checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), + CHUNK_SIZE / BYTES_PER_CHECKSUM, BYTES_PER_CHECKSUM); + + // Read the last byte of chunk and verify that the buffers are released. + chunk0Stream.read(new byte[1]); + Assert.assertNull("ChunkInputStream did not release buffers after " + + "reaching EOF.", chunk0Stream.getCachedBuffers()); + + } + + private byte[] readDataFromChunk(ChunkInputStream chunkInputStream, + int readDataLength, int offset) throws IOException { + byte[] readData = new byte[readDataLength]; + chunkInputStream.seek(offset); + chunkInputStream.read(readData, 0, readDataLength); + return readData; + } + + private void checkBufferSizeAndCapacity(List buffers, + int expectedNumBuffers, long expectedBufferCapacity) { + Assert.assertEquals("ChunkInputStream does not have expected number of " + + "ByteBuffers", expectedNumBuffers, buffers.size()); + for (ByteBuffer buffer : buffers) { + Assert.assertEquals("ChunkInputStream ByteBuffer capacity is wrong", + expectedBufferCapacity, buffer.capacity()); + } + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java new file mode 100644 index 000000000000..3735e8a1a9b5 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.client.rpc.read; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration; +import org.apache.hadoop.hdds.scm.storage.BlockInputStream; +import org.apache.hadoop.hdds.scm.storage.ChunkInputStream; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.KeyInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.common.utils.BufferUtils; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.TestHelper; +import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion; +import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +@RunWith(Parameterized.class) +public class TestInputStreamBase { + + private MiniOzoneCluster cluster; + private OzoneConfiguration conf = new OzoneConfiguration(); + private OzoneClient client; + private ObjectStore objectStore; + + private String volumeName; + private String bucketName; + private String keyString; + + private ChunkLayOutVersion chunkLayout; + private static final Random RAND = new Random(); + + protected static final int CHUNK_SIZE = 1024 * 1024; // 1MB + protected static final int FLUSH_SIZE = 2 * CHUNK_SIZE; // 2MB + protected static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE; // 4MB + protected static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE; // 8MB + protected static final int BYTES_PER_CHECKSUM = 256 * 1024; // 256KB + + @Rule + public Timeout timeout = Timeout.seconds(300); + + @Parameterized.Parameters + public static Iterable parameters() { + return ChunkLayoutTestInfo.chunkLayoutParameters(); + } + + public TestInputStreamBase(ChunkLayOutVersion layout) { + this.chunkLayout = layout; + } + + /** + * Create a MiniDFSCluster for testing. + * @throws IOException + */ + @Before + public void init() throws Exception { + OzoneClientConfig config = new OzoneClientConfig(); + config.setBytesPerChecksum(BYTES_PER_CHECKSUM); + conf.setFromObject(config); + + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS); + conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1); + conf.setQuietMode(false); + conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 64, + StorageUnit.MB); + conf.set(ScmConfigKeys.OZONE_SCM_CHUNK_LAYOUT_KEY, chunkLayout.toString()); + + ReplicationManagerConfiguration repConf = + conf.getObject(ReplicationManagerConfiguration.class); + repConf.setInterval(Duration.ofSeconds(1)); + conf.setFromObject(repConf); + + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(4) + .setTotalPipelineNumLimit(5) + .setBlockSize(BLOCK_SIZE) + .setChunkSize(CHUNK_SIZE) + .setStreamBufferFlushSize(FLUSH_SIZE) + .setStreamBufferMaxSize(MAX_FLUSH_SIZE) + .setStreamBufferSizeUnit(StorageUnit.BYTES) + .build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getRpcClient(conf); + objectStore = client.getObjectStore(); + + volumeName = UUID.randomUUID().toString(); + bucketName = UUID.randomUUID().toString(); + keyString = UUID.randomUUID().toString(); + + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + } + + /** + * Shutdown MiniDFSCluster. + */ + @After + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + MiniOzoneCluster getCluster() { + return cluster; + } + + String getVolumeName() { + return volumeName; + } + + String getBucketName() { + return bucketName; + } + + KeyInputStream getKeyInputStream(String keyName) throws IOException { + return (KeyInputStream) objectStore + .getVolume(volumeName) + .getBucket(bucketName) + .readKey(keyName).getInputStream(); + } + + String getNewKeyName() { + return UUID.randomUUID().toString(); + } + + byte[] writeKey(String keyName, int dataLength) throws Exception { + OzoneOutputStream key = TestHelper.createKey(keyName, + ReplicationType.RATIS, 0, objectStore, volumeName, bucketName); + + byte[] inputData = ContainerTestHelper.getFixedLengthString( + keyString, dataLength).getBytes(UTF_8); + key.write(inputData); + key.close(); + + return inputData; + } + + byte[] writeRandomBytes(String keyName, int dataLength) + throws Exception { + OzoneOutputStream key = TestHelper.createKey(keyName, + ReplicationType.RATIS, 0, objectStore, volumeName, bucketName); + + byte[] inputData = new byte[dataLength]; + RAND.nextBytes(inputData); + key.write(inputData); + key.close(); + + return inputData; + } + + void validateData(byte[] inputData, int offset, byte[] readData) { + int readDataLen = readData.length; + byte[] expectedData = new byte[readDataLen]; + System.arraycopy(inputData, (int) offset, expectedData, 0, readDataLen); + + for (int i=0; i < readDataLen; i++) { + Assert.assertEquals("Read data at does not match the input data at " + + "position " + (offset + i), expectedData[i], readData[i]); + } + } + + @Test + public void testInputStreams() throws Exception { + String keyName = getNewKeyName(); + int dataLength = (2 * BLOCK_SIZE) + (CHUNK_SIZE) + 1; + writeRandomBytes(keyName, dataLength); + + KeyInputStream keyInputStream = getKeyInputStream(keyName); + + // Verify BlockStreams and ChunkStreams + int expectedNumBlockStreams = BufferUtils.getNumberOfBins( + dataLength, BLOCK_SIZE); + List blockStreams = keyInputStream.getBlockStreams(); + Assert.assertEquals(expectedNumBlockStreams, blockStreams.size()); + + int readBlockLength = 0; + for (BlockInputStream blockStream : blockStreams) { + int blockStreamLength = Math.min(BLOCK_SIZE, + dataLength - readBlockLength); + Assert.assertEquals(blockStreamLength, blockStream.getLength()); + + int expectedNumChunkStreams = + BufferUtils.getNumberOfBins(blockStreamLength, CHUNK_SIZE); + blockStream.initialize(); + List chunkStreams = blockStream.getChunkStreams(); + Assert.assertEquals(expectedNumChunkStreams, chunkStreams.size()); + + int readChunkLength = 0; + for (ChunkInputStream chunkStream : chunkStreams) { + int chunkStreamLength = Math.min(CHUNK_SIZE, + blockStreamLength - readChunkLength); + Assert.assertEquals(chunkStreamLength, chunkStream.getRemaining()); + + readChunkLength += chunkStreamLength; + } + + readBlockLength += blockStreamLength; + } + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java similarity index 51% rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java index 8a81de0eb286..a4513754546b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java @@ -15,208 +15,48 @@ * the License. */ -package org.apache.hadoop.ozone.client.rpc; +package org.apache.hadoop.ozone.client.rpc.read; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; -import java.time.Duration; import java.util.Arrays; -import java.util.Collection; + import java.util.List; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; - -import org.apache.hadoop.conf.StorageUnit; -import org.apache.hadoop.hdds.client.ReplicationType; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.OzoneClientConfig; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientMetrics; -import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.client.ObjectStore; -import org.apache.hadoop.ozone.client.OzoneClient; -import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.io.KeyInputStream; -import org.apache.hadoop.ozone.client.io.OzoneOutputStream; -import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; -import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; -import static org.apache.hadoop.ozone.container.TestHelper.countReplicas; -import static org.junit.Assert.fail; - +import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.test.GenericTestUtils; -import org.junit.After; import org.junit.Assert; import org.junit.Assume; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.Timeout; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.ozone.container.TestHelper.countReplicas; +import static org.junit.Assert.fail; + /** * Tests {@link KeyInputStream}. */ -@RunWith(Parameterized.class) -public class TestKeyInputStream { +public class TestKeyInputStream extends TestInputStreamBase { private static final Logger LOG = LoggerFactory.getLogger(TestKeyInputStream.class); - private MiniOzoneCluster cluster; - private OzoneConfiguration conf = new OzoneConfiguration(); - private OzoneClient client; - private ObjectStore objectStore; - private int chunkSize; - private int flushSize; - private int maxFlushSize; - private int blockSize; - private String volumeName; - private String bucketName; - private String keyString; - private ChunkLayoutTestInfo chunkLayout; - - @Parameterized.Parameters - public static Collection layouts() { - return Arrays.asList(new Object[][] { - {ChunkLayoutTestInfo.FILE_PER_CHUNK}, - {ChunkLayoutTestInfo.FILE_PER_BLOCK} - }); - } - - public TestKeyInputStream(ChunkLayoutTestInfo layout) { - this.chunkLayout = layout; - } - /** - * Create a MiniDFSCluster for testing. - *

- * Ozone is made active by setting OZONE_ENABLED = true - * - * @throws IOException - */ - @Before - public void init() throws Exception { - chunkSize = 256 * 1024 * 2; - flushSize = 2 * chunkSize; - maxFlushSize = 2 * flushSize; - blockSize = 2 * maxFlushSize; - - OzoneClientConfig config = new OzoneClientConfig(); - config.setBytesPerChecksum(256 * 1024 * 1024); - conf.setFromObject(config); - - - conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); - conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS); - conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1); - conf.setQuietMode(false); - conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 64, - StorageUnit.MB); - conf.set(ScmConfigKeys.OZONE_SCM_CHUNK_LAYOUT_KEY, chunkLayout.name()); - - ReplicationManagerConfiguration repConf = - conf.getObject(ReplicationManagerConfiguration.class); - repConf.setInterval(Duration.ofSeconds(1)); - conf.setFromObject(repConf); - - cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(4) - .setTotalPipelineNumLimit(5) - .setBlockSize(blockSize) - .setChunkSize(chunkSize) - .setStreamBufferFlushSize(flushSize) - .setStreamBufferMaxSize(maxFlushSize) - .setStreamBufferSizeUnit(StorageUnit.BYTES) - .build(); - cluster.waitForClusterToBeReady(); - //the easiest way to create an open container is creating a key - client = OzoneClientFactory.getRpcClient(conf); - objectStore = client.getObjectStore(); - keyString = UUID.randomUUID().toString(); - volumeName = "test-key-input-stream-volume"; - bucketName = "test-key-input-stream-bucket"; - objectStore.createVolume(volumeName); - objectStore.getVolume(volumeName).createBucket(bucketName); - } - - @Rule - public Timeout timeout = Timeout.seconds(300); - - /** - * Shutdown MiniDFSCluster. - */ - @After - public void shutdown() { - if (cluster != null) { - cluster.shutdown(); - } - } - - private String getKeyName() { - return UUID.randomUUID().toString(); - } - - - @Test - public void testSeekRandomly() throws Exception { - String keyName = getKeyName(); - OzoneOutputStream key = TestHelper.createKey(keyName, - ReplicationType.RATIS, 0, objectStore, volumeName, bucketName); - - // write data of more than 2 blocks. - int dataLength = (2 * blockSize) + (chunkSize); - - byte[] inputData = writeRandomBytes(key, dataLength); - - KeyInputStream keyInputStream = (KeyInputStream) objectStore - .getVolume(volumeName).getBucket(bucketName).readKey(keyName) - .getInputStream(); - - // Seek to some where end. - validate(keyInputStream, inputData, dataLength-200, 100); - - // Now seek to start. - validate(keyInputStream, inputData, 0, 140); - - validate(keyInputStream, inputData, 200, 300); - - validate(keyInputStream, inputData, 30, 500); - - randomSeek(dataLength, keyInputStream, inputData); - - // Read entire key. - validate(keyInputStream, inputData, 0, dataLength); - - // Repeat again and check. - randomSeek(dataLength, keyInputStream, inputData); - - validate(keyInputStream, inputData, 0, dataLength); - - keyInputStream.close(); + public TestKeyInputStream(ChunkLayOutVersion layout) { + super(layout); } /** @@ -260,18 +100,42 @@ private void validate(KeyInputStream keyInputStream, byte[] inputData, long seek, int readLength) throws Exception { keyInputStream.seek(seek); - byte[] expectedData = new byte[readLength]; - keyInputStream.read(expectedData, 0, readLength); + byte[] readData = new byte[readLength]; + keyInputStream.read(readData, 0, readLength); + + validateData(inputData, (int) seek, readData); + } + + @Test + public void testSeekRandomly() throws Exception { + String keyName = getNewKeyName(); + int dataLength = (2 * BLOCK_SIZE) + (CHUNK_SIZE); + byte[] inputData = writeRandomBytes(keyName, dataLength); - byte[] dest = new byte[readLength]; + KeyInputStream keyInputStream = getKeyInputStream(keyName); - System.arraycopy(inputData, (int)seek, dest, 0, readLength); + // Seek to some where end. + validate(keyInputStream, inputData, dataLength-200, 100); - for (int i=0; i < readLength; i++) { - Assert.assertEquals(expectedData[i], dest[i]); - } - } + // Now seek to start. + validate(keyInputStream, inputData, 0, 140); + validate(keyInputStream, inputData, 200, 300); + + validate(keyInputStream, inputData, 30, 500); + + randomSeek(dataLength, keyInputStream, inputData); + + // Read entire key. + validate(keyInputStream, inputData, 0, dataLength); + + // Repeat again and check. + randomSeek(dataLength, keyInputStream, inputData); + + validate(keyInputStream, inputData, 0, dataLength); + + keyInputStream.close(); + } @Test public void testSeek() throws Exception { @@ -283,23 +147,15 @@ public void testSeek() throws Exception { long readChunkCount = metrics.getContainerOpCountMetrics( ContainerProtos.Type.ReadChunk); - String keyName = getKeyName(); - OzoneOutputStream key = TestHelper.createKey(keyName, - ReplicationType.RATIS, 0, objectStore, volumeName, bucketName); - + String keyName = getNewKeyName(); // write data spanning 3 chunks - int dataLength = (2 * chunkSize) + (chunkSize / 2); - byte[] inputData = ContainerTestHelper.getFixedLengthString( - keyString, dataLength).getBytes(UTF_8); - key.write(inputData); - key.close(); + int dataLength = (2 * CHUNK_SIZE) + (CHUNK_SIZE / 2); + byte[] inputData = writeKey(keyName, dataLength); Assert.assertEquals(writeChunkCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); - KeyInputStream keyInputStream = (KeyInputStream) objectStore - .getVolume(volumeName).getBucket(bucketName).readKey(keyName) - .getInputStream(); + KeyInputStream keyInputStream = getKeyInputStream(keyName); // Seek to position 150 keyInputStream.seek(150); @@ -310,8 +166,8 @@ public void testSeek() throws Exception { Assert.assertEquals(readChunkCount, metrics .getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk)); - byte[] readData = new byte[chunkSize]; - keyInputStream.read(readData, 0, chunkSize); + byte[] readData = new byte[CHUNK_SIZE]; + keyInputStream.read(readData, 0, CHUNK_SIZE); // Since we reading data from index 150 to 250 and the chunk boundary is // 100 bytes, we need to read 2 chunks. @@ -322,29 +178,25 @@ public void testSeek() throws Exception { // Verify that the data read matches with the input data at corresponding // indices. - for (int i = 0; i < chunkSize; i++) { - Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]); + for (int i = 0; i < CHUNK_SIZE; i++) { + Assert.assertEquals(inputData[CHUNK_SIZE + 50 + i], readData[i]); } } @Test public void testReadChunk() throws Exception { - String keyName = getKeyName(); - OzoneOutputStream key = TestHelper.createKey(keyName, - ReplicationType.RATIS, 0, objectStore, volumeName, bucketName); + String keyName = getNewKeyName(); // write data spanning multiple blocks/chunks - int dataLength = 2 * blockSize + (blockSize / 2); - byte[] data = writeRandomBytes(key, dataLength); + int dataLength = 2 * BLOCK_SIZE + (BLOCK_SIZE / 2); + byte[] data = writeRandomBytes(keyName, dataLength); // read chunk data - try (KeyInputStream keyInputStream = (KeyInputStream) objectStore - .getVolume(volumeName).getBucket(bucketName).readKey(keyName) - .getInputStream()) { + try (KeyInputStream keyInputStream = getKeyInputStream(keyName)) { - int[] bufferSizeList = {chunkSize / 4, chunkSize / 2, chunkSize - 1, - chunkSize, chunkSize + 1, blockSize - 1, blockSize, blockSize + 1, - blockSize * 2}; + int[] bufferSizeList = {BYTES_PER_CHECKSUM + 1, CHUNK_SIZE / 4, + CHUNK_SIZE / 2, CHUNK_SIZE - 1, CHUNK_SIZE, CHUNK_SIZE + 1, + BLOCK_SIZE - 1, BLOCK_SIZE, BLOCK_SIZE + 1, BLOCK_SIZE * 2}; for (int bufferSize : bufferSizeList) { assertReadFully(data, keyInputStream, bufferSize, 0); keyInputStream.seek(0); @@ -362,39 +214,35 @@ public void testSkip() throws Exception { long readChunkCount = metrics.getContainerOpCountMetrics( ContainerProtos.Type.ReadChunk); - String keyName = getKeyName(); - OzoneOutputStream key = TestHelper.createKey(keyName, - ReplicationType.RATIS, 0, objectStore, volumeName, bucketName); - + String keyName = getNewKeyName(); // write data spanning 3 chunks - int dataLength = (2 * chunkSize) + (chunkSize / 2); - byte[] inputData = ContainerTestHelper.getFixedLengthString( - keyString, dataLength).getBytes(UTF_8); - key.write(inputData); - key.close(); + int dataLength = (2 * CHUNK_SIZE) + (CHUNK_SIZE / 2); + byte[] inputData = writeKey(keyName, dataLength); Assert.assertEquals(writeChunkCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); - KeyInputStream keyInputStream = (KeyInputStream) objectStore - .getVolume(volumeName).getBucket(bucketName).readKey(keyName) - .getInputStream(); + KeyInputStream keyInputStream = getKeyInputStream(keyName); // skip 150 - keyInputStream.skip(70); + long skipped = keyInputStream.skip(70); + Assert.assertEquals(70, skipped); Assert.assertEquals(70, keyInputStream.getPos()); - keyInputStream.skip(0); + + skipped = keyInputStream.skip(0); + Assert.assertEquals(0, skipped); Assert.assertEquals(70, keyInputStream.getPos()); - keyInputStream.skip(80); + skipped = keyInputStream.skip(80); + Assert.assertEquals(80, skipped); Assert.assertEquals(150, keyInputStream.getPos()); // Skip operation should not result in any readChunk operation. Assert.assertEquals(readChunkCount, metrics .getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk)); - byte[] readData = new byte[chunkSize]; - keyInputStream.read(readData, 0, chunkSize); + byte[] readData = new byte[CHUNK_SIZE]; + keyInputStream.read(readData, 0, CHUNK_SIZE); // Since we reading data from index 150 to 250 and the chunk boundary is // 100 bytes, we need to read 2 chunks. @@ -405,8 +253,8 @@ public void testSkip() throws Exception { // Verify that the data read matches with the input data at corresponding // indices. - for (int i = 0; i < chunkSize; i++) { - Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]); + for (int i = 0; i < CHUNK_SIZE; i++) { + Assert.assertEquals(inputData[CHUNK_SIZE + 50 + i], readData[i]); } } @@ -421,22 +269,19 @@ public void readAfterReplicationWithUnbuffering() throws Exception { } private void testReadAfterReplication(boolean doUnbuffer) throws Exception { - Assume.assumeTrue(cluster.getHddsDatanodes().size() > 3); - - int dataLength = 2 * chunkSize; - String keyName = getKeyName(); - OzoneOutputStream key = TestHelper.createKey(keyName, - ReplicationType.RATIS, dataLength, objectStore, volumeName, bucketName); + Assume.assumeTrue(getCluster().getHddsDatanodes().size() > 3); - byte[] data = writeRandomBytes(key, dataLength); + int dataLength = 2 * CHUNK_SIZE; + String keyName = getNewKeyName(); + byte[] data = writeRandomBytes(keyName, dataLength); - OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) - .setBucketName(bucketName) + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(getVolumeName()) + .setBucketName(getBucketName()) .setKeyName(keyName) .setType(HddsProtos.ReplicationType.RATIS) .setFactor(HddsProtos.ReplicationFactor.THREE) .build(); - OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + OmKeyInfo keyInfo = getCluster().getOzoneManager().lookupKey(keyArgs); OmKeyLocationInfoGroup locations = keyInfo.getLatestVersionLocations(); Assert.assertNotNull(locations); @@ -444,16 +289,14 @@ private void testReadAfterReplication(boolean doUnbuffer) throws Exception { Assert.assertEquals(1, locationInfoList.size()); OmKeyLocationInfo loc = locationInfoList.get(0); long containerID = loc.getContainerID(); - Assert.assertEquals(3, countReplicas(containerID, cluster)); + Assert.assertEquals(3, countReplicas(containerID, getCluster())); - TestHelper.waitForContainerClose(cluster, containerID); + TestHelper.waitForContainerClose(getCluster(), containerID); List pipelineNodes = loc.getPipeline().getNodes(); // read chunk data - try (KeyInputStream keyInputStream = (KeyInputStream) objectStore - .getVolume(volumeName).getBucket(bucketName) - .readKey(keyName).getInputStream()) { + try (KeyInputStream keyInputStream = getKeyInputStream(keyName)) { int b = keyInputStream.read(); Assert.assertNotEquals(-1, b); @@ -462,7 +305,7 @@ private void testReadAfterReplication(boolean doUnbuffer) throws Exception { keyInputStream.unbuffer(); } - cluster.shutdownHddsDatanode(pipelineNodes.get(0)); + getCluster().shutdownHddsDatanode(pipelineNodes.get(0)); // check that we can still read it assertReadFully(data, keyInputStream, dataLength - 1, 1); @@ -482,7 +325,7 @@ private HddsProtos.NodeState getNodeHealth(DatanodeDetails dn) { HddsProtos.NodeState health = null; try { NodeManager nodeManager = - cluster.getStorageContainerManager().getScmNodeManager(); + getCluster().getStorageContainerManager().getScmNodeManager(); health = nodeManager.getNodeStatus(dn).getHealth(); } catch (NodeNotFoundException e) { fail("Unexpected NodeNotFound exception"); @@ -490,17 +333,7 @@ private HddsProtos.NodeState getNodeHealth(DatanodeDetails dn) { return health; } - private byte[] writeRandomBytes(OutputStream key, int size) - throws IOException { - byte[] data = new byte[size]; - Random r = new Random(); - r.nextBytes(data); - key.write(data); - key.close(); - return data; - } - - private static void assertReadFully(byte[] data, InputStream in, + private void assertReadFully(byte[] data, InputStream in, int bufferSize, int totalRead) throws IOException { byte[] buffer = new byte[bufferSize]; @@ -518,5 +351,4 @@ private static void assertReadFully(byte[] data, InputStream in, } Assert.assertEquals(data.length, totalRead); } - } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java index 7c8ff9014a8c..5c467a07d773 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java @@ -103,7 +103,8 @@ public void testAllocateWrite() throws Exception { "data123".getBytes(UTF_8), null); ContainerProtos.GetSmallFileResponseProto response = ContainerProtocolCalls.readSmallFile(client, blockID, null); - String readData = response.getData().getData().toStringUtf8(); + String readData = response.getData().getDataBuffers().getBuffersList() + .get(0).toStringUtf8(); Assert.assertEquals("data123", readData); xceiverClientManager.releaseClient(client, false); } @@ -204,7 +205,8 @@ public void testReadWriteWithBCSId() throws Exception { blockID1.setBlockCommitSequenceId(bcsId); ContainerProtos.GetSmallFileResponseProto response = ContainerProtocolCalls.readSmallFile(client, blockID1, null); - String readData = response.getData().getData().toStringUtf8(); + String readData = response.getData().getDataBuffers().getBuffersList() + .get(0).toStringUtf8(); Assert.assertEquals("data123", readData); xceiverClientManager.releaseClient(client, false); } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkValidator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkValidator.java index 7300fa5441eb..c06914a4b4a5 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkValidator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkValidator.java @@ -172,9 +172,13 @@ private void readReference() throws IOException { xceiverClientSpi.sendCommand(request); checksum = new Checksum(ContainerProtos.ChecksumType.CRC32, chunkSize); - checksumReference = checksum.computeChecksum( - response.getReadChunk().getData().toByteArray() - ); + if (response.getReadChunk().hasData()) { + checksumReference = checksum.computeChecksum( + response.getReadChunk().getData().toByteArray()); + } else { + checksumReference = checksum.computeChecksum( + response.getReadChunk().getDataBuffers().getBuffersList()); + } } @@ -221,10 +225,14 @@ private void validateChunk(long stepNo) throws Exception { ContainerProtos.ContainerCommandResponseProto response = xceiverClientSpi.sendCommand(request); - ChecksumData checksumOfChunk = - checksum.computeChecksum( - response.getReadChunk().getData().toByteArray() - ); + ChecksumData checksumOfChunk; + if (response.getReadChunk().hasData()) { + checksumOfChunk = checksum.computeChecksum( + response.getReadChunk().getData().toByteArray()); + } else { + checksumOfChunk = checksum.computeChecksum( + response.getReadChunk().getDataBuffers().getBuffersList()); + } if (!checksumReference.equals(checksumOfChunk)) { throw new IllegalStateException( diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java index 7ffebecbfc6c..31c87366d1d8 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java @@ -203,7 +203,8 @@ private ContainerCommandRequestProto getReadChunkCommand( ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto .newBuilder() .setBlockID(blockID.getDatanodeBlockIDProtobuf()) - .setChunkData(getChunkInfo(blockID, chunkName)); + .setChunkData(getChunkInfo(blockID, chunkName)) + .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1); ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto .newBuilder(); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchmarkChunkManager.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchmarkChunkManager.java index 76cd0372791b..88b5613a892f 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchmarkChunkManager.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchmarkChunkManager.java @@ -139,7 +139,7 @@ public void writeMultipleFiles(BenchmarkState state, Blackhole sink) public void writeSingleFile(BenchmarkState state, Blackhole sink) throws StorageContainerException { - ChunkManager chunkManager = new FilePerBlockStrategy(true); + ChunkManager chunkManager = new FilePerBlockStrategy(true, null); benchmark(chunkManager, FILE_PER_BLOCK, state, sink); }