diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 14532c550de5..01bd4db8a115 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -756,7 +756,7 @@ private void validateChunkChecksumData(ChunkBuffer data, ChunkInfo info) throws StorageContainerException { if (validateChunkChecksumData) { try { - Checksum.verifyChecksum(data, info.getChecksumData(), 0); + Checksum.verifyChecksum(data.duplicate(data.position(), data.limit()), info.getChecksumData(), 0); } catch (OzoneChecksumException ex) { throw ChunkUtils.wrapInStorageContainerException(ex); } @@ -857,9 +857,9 @@ ContainerCommandResponseProto handlePutSmallFile( // chunks will be committed as a part of handling putSmallFile // here. There is no need to maintain this info in openContainerBlockMap. + validateChunkChecksumData(data, chunkInfo); chunkManager .writeChunk(kvContainer, blockID, chunkInfo, data, dispatcherContext); - validateChunkChecksumData(data, chunkInfo); chunkManager.finishWriteChunks(kvContainer, blockData); List chunks = new LinkedList<>(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java index 95df6c647f8b..7d000f2a5fac 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java @@ -20,6 +20,7 @@ import com.google.common.collect.Maps; import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.client.BlockID; @@ -39,6 +40,8 @@ import org.apache.hadoop.hdds.security.token.TokenVerifier; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; @@ -46,6 +49,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.Op; @@ -164,6 +168,72 @@ public void testContainerCloseActionWhenFull( } } + @Test + public void testSmallFileChecksum() throws IOException { + String testDirPath = testDir.getPath(); + try { + UUID scmId = UUID.randomUUID(); + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(HDDS_DATANODE_DIR_KEY, testDirPath); + conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDirPath); + DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class); + dnConf.setChunkDataValidationCheck(true); + conf.setFromObject(dnConf); + DatanodeDetails dd = randomDatanodeDetails(); + HddsDispatcher hddsDispatcher = createDispatcher(dd, scmId, conf); + + ContainerCommandResponseProto smallFileResponse = + hddsDispatcher.dispatch(newPutSmallFile(1L, 1L), null); + + assertEquals(ContainerProtos.Result.SUCCESS, smallFileResponse.getResult()); + } finally { + ContainerMetrics.remove(); + } + } + + @Test + public void testWriteChunkChecksum() throws IOException { + String testDirPath = testDir.getPath(); + try { + UUID scmId = UUID.randomUUID(); + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(HDDS_DATANODE_DIR_KEY, testDirPath); + conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDirPath); + DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class); + dnConf.setChunkDataValidationCheck(true); + conf.setFromObject(dnConf); + DatanodeDetails dd = randomDatanodeDetails(); + HddsDispatcher hddsDispatcher = createDispatcher(dd, scmId, conf); + //Send a few WriteChunkRequests + ContainerCommandResponseProto response; + ContainerCommandRequestProto writeChunkRequest0 = getWriteChunkRequest0(dd.getUuidString(), 1L, 1L, 0); + hddsDispatcher.dispatch(writeChunkRequest0, null); + hddsDispatcher.dispatch(getWriteChunkRequest0(dd.getUuidString(), 1L, 1L, 1), null); + response = hddsDispatcher.dispatch(getWriteChunkRequest0(dd.getUuidString(), 1L, 1L, 2), null); + + assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + // Send Read Chunk request for written chunk. + response = + hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest0), null); + assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + + ByteString responseData = BufferUtils.concatByteStrings( + response.getReadChunk().getDataBuffers().getBuffersList()); + assertEquals(writeChunkRequest0.getWriteChunk().getData(), + responseData); + + // Test checksum on Read: + final DispatcherContext context = DispatcherContext + .newBuilder(DispatcherContext.Op.READ_STATE_MACHINE_DATA) + .build(); + response = + hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest0), context); + assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + } finally { + ContainerMetrics.remove(); + } + } + @ContainerLayoutTestInfo.ContainerTest public void testContainerCloseActionWhenVolumeFull( ContainerLayoutVersion layoutVersion) throws Exception { @@ -512,6 +582,84 @@ private ContainerCommandRequestProto getWriteChunkRequest( .build(); } + static ChecksumData checksum(ByteString data) { + try { + return new Checksum(ContainerProtos.ChecksumType.CRC32, 256) + .computeChecksum(data.asReadOnlyByteBuffer()); + } catch (OzoneChecksumException e) { + throw new IllegalStateException(e); + } + } + + private ContainerCommandRequestProto getWriteChunkRequest0( + String datanodeId, Long containerId, Long localId, int chunkNum) { + final int lenOfBytes = 32; + ByteString chunkData = ByteString.copyFrom(RandomUtils.nextBytes(32)); + + ContainerProtos.ChunkInfo chunk = ContainerProtos.ChunkInfo + .newBuilder() + .setChunkName( + DigestUtils.md5Hex("dummy-key") + "_stream_" + + containerId + "_chunk_" + localId) + .setOffset((long) chunkNum * lenOfBytes) + .setLen(lenOfBytes) + .setChecksumData(checksum(chunkData).getProtoBufMessage()) + .build(); + + WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto + .newBuilder() + .setBlockID(new BlockID(containerId, localId) + .getDatanodeBlockIDProtobuf()) + .setChunkData(chunk) + .setData(chunkData); + + return ContainerCommandRequestProto + .newBuilder() + .setContainerID(containerId) + .setCmdType(ContainerProtos.Type.WriteChunk) + .setDatanodeUuid(datanodeId) + .setWriteChunk(writeChunkRequest) + .build(); + } + + static ContainerCommandRequestProto newPutSmallFile(Long containerId, Long localId) { + ByteString chunkData = ByteString.copyFrom(RandomUtils.nextBytes(32)); + return newPutSmallFile(new BlockID(containerId, localId), chunkData); + } + + static ContainerCommandRequestProto newPutSmallFile( + BlockID blockID, ByteString data) { + final ContainerProtos.BlockData.Builder blockData + = ContainerProtos.BlockData.newBuilder() + .setBlockID(blockID.getDatanodeBlockIDProtobuf()); + final ContainerProtos.PutBlockRequestProto.Builder putBlockRequest + = ContainerProtos.PutBlockRequestProto.newBuilder() + .setBlockData(blockData); + final ContainerProtos.KeyValue keyValue = ContainerProtos.KeyValue.newBuilder() + .setKey("OverWriteRequested") + .setValue("true") + .build(); + final ContainerProtos.ChunkInfo chunk = ContainerProtos.ChunkInfo.newBuilder() + .setChunkName(blockID.getLocalID() + "_chunk") + .setOffset(0) + .setLen(data.size()) + .addMetadata(keyValue) + .setChecksumData(checksum(data).getProtoBufMessage()) + .build(); + final ContainerProtos.PutSmallFileRequestProto putSmallFileRequest + = ContainerProtos.PutSmallFileRequestProto.newBuilder() + .setChunkInfo(chunk) + .setBlock(putBlockRequest) + .setData(data) + .build(); + return ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.PutSmallFile) + .setContainerID(blockID.getContainerID()) + .setDatanodeUuid(UUID.randomUUID().toString()) + .setPutSmallFile(putSmallFileRequest) + .build(); + } + /** * Creates container read chunk request using input container write chunk * request.