Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ private void validateChunkChecksumData(ChunkBuffer data, ChunkInfo info)
throws StorageContainerException {
if (validateChunkChecksumData) {
try {
Checksum.verifyChecksum(data, info.getChecksumData(), 0);
Checksum.verifyChecksum(data.duplicate(data.position(), data.limit()), info.getChecksumData(), 0);
} catch (OzoneChecksumException ex) {
throw ChunkUtils.wrapInStorageContainerException(ex);
}
Expand Down Expand Up @@ -857,9 +857,9 @@ ContainerCommandResponseProto handlePutSmallFile(

// chunks will be committed as a part of handling putSmallFile
// here. There is no need to maintain this info in openContainerBlockMap.
validateChunkChecksumData(data, chunkInfo);
chunkManager
.writeChunk(kvContainer, blockID, chunkInfo, data, dispatcherContext);
validateChunkChecksumData(data, chunkInfo);
chunkManager.finishWriteChunks(kvContainer, blockData);

List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.collect.Maps;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.BlockID;
Expand All @@ -39,13 +40,16 @@
import org.apache.hadoop.hdds.security.token.TokenVerifier;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.Op;
Expand Down Expand Up @@ -164,6 +168,72 @@ public void testContainerCloseActionWhenFull(
}
}

@Test
public void testSmallFileChecksum() throws IOException {
String testDirPath = testDir.getPath();
try {
UUID scmId = UUID.randomUUID();
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(HDDS_DATANODE_DIR_KEY, testDirPath);
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDirPath);
DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
dnConf.setChunkDataValidationCheck(true);
conf.setFromObject(dnConf);
DatanodeDetails dd = randomDatanodeDetails();
HddsDispatcher hddsDispatcher = createDispatcher(dd, scmId, conf);

ContainerCommandResponseProto smallFileResponse =
hddsDispatcher.dispatch(newPutSmallFile(1L, 1L), null);

assertEquals(ContainerProtos.Result.SUCCESS, smallFileResponse.getResult());
} finally {
ContainerMetrics.remove();
}
}

@Test
public void testWriteChunkChecksum() throws IOException {
String testDirPath = testDir.getPath();
try {
UUID scmId = UUID.randomUUID();
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(HDDS_DATANODE_DIR_KEY, testDirPath);
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDirPath);
DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
dnConf.setChunkDataValidationCheck(true);
conf.setFromObject(dnConf);
DatanodeDetails dd = randomDatanodeDetails();
HddsDispatcher hddsDispatcher = createDispatcher(dd, scmId, conf);
//Send a few WriteChunkRequests
ContainerCommandResponseProto response;
ContainerCommandRequestProto writeChunkRequest0 = getWriteChunkRequest0(dd.getUuidString(), 1L, 1L, 0);
hddsDispatcher.dispatch(writeChunkRequest0, null);
hddsDispatcher.dispatch(getWriteChunkRequest0(dd.getUuidString(), 1L, 1L, 1), null);
response = hddsDispatcher.dispatch(getWriteChunkRequest0(dd.getUuidString(), 1L, 1L, 2), null);

assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
// Send Read Chunk request for written chunk.
response =
hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest0), null);
assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());

ByteString responseData = BufferUtils.concatByteStrings(
response.getReadChunk().getDataBuffers().getBuffersList());
assertEquals(writeChunkRequest0.getWriteChunk().getData(),
responseData);

// Test checksum on Read:
final DispatcherContext context = DispatcherContext
.newBuilder(DispatcherContext.Op.READ_STATE_MACHINE_DATA)
.build();
response =
hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest0), context);
assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
} finally {
ContainerMetrics.remove();
}
}

@ContainerLayoutTestInfo.ContainerTest
public void testContainerCloseActionWhenVolumeFull(
ContainerLayoutVersion layoutVersion) throws Exception {
Expand Down Expand Up @@ -512,6 +582,84 @@ private ContainerCommandRequestProto getWriteChunkRequest(
.build();
}

static ChecksumData checksum(ByteString data) {
try {
return new Checksum(ContainerProtos.ChecksumType.CRC32, 256)
.computeChecksum(data.asReadOnlyByteBuffer());
} catch (OzoneChecksumException e) {
throw new IllegalStateException(e);
}
}

private ContainerCommandRequestProto getWriteChunkRequest0(
String datanodeId, Long containerId, Long localId, int chunkNum) {
final int lenOfBytes = 32;
ByteString chunkData = ByteString.copyFrom(RandomUtils.nextBytes(32));

ContainerProtos.ChunkInfo chunk = ContainerProtos.ChunkInfo
.newBuilder()
.setChunkName(
DigestUtils.md5Hex("dummy-key") + "_stream_"
+ containerId + "_chunk_" + localId)
.setOffset((long) chunkNum * lenOfBytes)
.setLen(lenOfBytes)
.setChecksumData(checksum(chunkData).getProtoBufMessage())
.build();

WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
.newBuilder()
.setBlockID(new BlockID(containerId, localId)
.getDatanodeBlockIDProtobuf())
.setChunkData(chunk)
.setData(chunkData);

return ContainerCommandRequestProto
.newBuilder()
.setContainerID(containerId)
.setCmdType(ContainerProtos.Type.WriteChunk)
.setDatanodeUuid(datanodeId)
.setWriteChunk(writeChunkRequest)
.build();
}

static ContainerCommandRequestProto newPutSmallFile(Long containerId, Long localId) {
ByteString chunkData = ByteString.copyFrom(RandomUtils.nextBytes(32));
return newPutSmallFile(new BlockID(containerId, localId), chunkData);
}

static ContainerCommandRequestProto newPutSmallFile(
BlockID blockID, ByteString data) {
final ContainerProtos.BlockData.Builder blockData
= ContainerProtos.BlockData.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf());
final ContainerProtos.PutBlockRequestProto.Builder putBlockRequest
= ContainerProtos.PutBlockRequestProto.newBuilder()
.setBlockData(blockData);
final ContainerProtos.KeyValue keyValue = ContainerProtos.KeyValue.newBuilder()
.setKey("OverWriteRequested")
.setValue("true")
.build();
final ContainerProtos.ChunkInfo chunk = ContainerProtos.ChunkInfo.newBuilder()
.setChunkName(blockID.getLocalID() + "_chunk")
.setOffset(0)
.setLen(data.size())
.addMetadata(keyValue)
.setChecksumData(checksum(data).getProtoBufMessage())
.build();
final ContainerProtos.PutSmallFileRequestProto putSmallFileRequest
= ContainerProtos.PutSmallFileRequestProto.newBuilder()
.setChunkInfo(chunk)
.setBlock(putBlockRequest)
.setData(data)
.build();
return ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.PutSmallFile)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(UUID.randomUUID().toString())
.setPutSmallFile(putSmallFileRequest)
.build();
}

/**
* Creates container read chunk request using input container write chunk
* request.
Expand Down