Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdfs.util.Canceler;
Expand All @@ -45,7 +46,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;

import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
Expand Down Expand Up @@ -421,8 +421,8 @@ private static ScanResult verifyChecksum(BlockData block,
" for block %s",
ChunkInfo.getFromProtoBuf(chunk),
i,
Arrays.toString(expected.toByteArray()),
Arrays.toString(actual.toByteArray()),
StringUtils.bytes2Hex(expected.asReadOnlyByteBuffer()),
StringUtils.bytes2Hex(actual.asReadOnlyByteBuffer()),
block.getBlockID());
return ScanResult.unhealthy(
ScanResult.FailureType.CORRUPT_CHUNK, chunkFile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.hadoop.hdds.scm.ha;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.SCMRatisResponseProto;
import org.apache.hadoop.hdds.scm.ha.io.CodecFactory;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;

/**
* Represents the response from RatisServer.
Expand Down Expand Up @@ -72,13 +73,11 @@ public static Message encode(final Object result)
}

final Class<?> type = result.getClass();
final ByteString value = CodecFactory.getCodec(type).serialize(result);

final SCMRatisResponseProto response = SCMRatisResponseProto.newBuilder()
.setType(type.getName()).setValue(value).build();
return Message.valueOf(
org.apache.ratis.thirdparty.com.google.protobuf.ByteString.copyFrom(
response.toByteArray()));
.setType(type.getName())
.setValue(CodecFactory.getCodec(type).serialize(result))
.build();
return Message.valueOf(UnsafeByteOperations.unsafeWrap(response.toByteString().asReadOnlyByteBuffer()));
}

public static SCMRatisResponse decode(RaftClientReply reply)
Expand All @@ -87,14 +86,13 @@ public static SCMRatisResponse decode(RaftClientReply reply)
return new SCMRatisResponse(reply.getException());
}

final byte[] response = reply.getMessage().getContent().toByteArray();
final ByteString response = reply.getMessage().getContent();

if (response.length == 0) {
if (response.isEmpty()) {
return new SCMRatisResponse();
}

final SCMRatisResponseProto responseProto = SCMRatisResponseProto
.parseFrom(response);
final SCMRatisResponseProto responseProto = SCMRatisResponseProto.parseFrom(response.toByteArray());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use the below to avoid creating new buffer too.

final SCMRatisResponseProto responseProto = SCMRatisResponseProto.parseFrom(response.newInput());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@duongkame , thanks for reviewing this!

The method parseFrom(InputStream) will call new CodedInputStream(InputStream), which creates a byte[] in the CodedInputStream constructor. Since it is InputStream, it probably will copy at least two times.


try {
final Class<?> type = ReflectionUtil.getClass(responseProto.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.util.DataChecksum;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.List;


Expand All @@ -42,8 +43,8 @@ public class ECBlockChecksumComputer extends AbstractBlockChecksumComputer {
private static final Logger LOG =
LoggerFactory.getLogger(ECBlockChecksumComputer.class);

private List<ContainerProtos.ChunkInfo> chunkInfoList;
private OmKeyInfo keyInfo;
private final List<ContainerProtos.ChunkInfo> chunkInfoList;
private final OmKeyInfo keyInfo;


public ECBlockChecksumComputer(
Expand All @@ -68,7 +69,7 @@ public void compute(OzoneClientConfig.ChecksumCombineMode combineMode)

}

private void computeMd5Crc() throws IOException {
private void computeMd5Crc() {
Preconditions.checkArgument(chunkInfoList.size() > 0);

final ContainerProtos.ChunkInfo firstChunkInfo = chunkInfoList.get(0);
Expand All @@ -77,32 +78,28 @@ private void computeMd5Crc() throws IOException {
// Total parity checksum bytes per stripe to remove
int parityBytes = getParityBytes(chunkSize, bytesPerCrc);

ByteArrayOutputStream out = new ByteArrayOutputStream();
final MessageDigest digester = MD5Hash.getDigester();

for (ContainerProtos.ChunkInfo chunkInfo : chunkInfoList) {
ByteString stripeChecksum = chunkInfo.getStripeChecksum();

Preconditions.checkNotNull(stripeChecksum);
byte[] checksumBytes = stripeChecksum.toByteArray();

Preconditions.checkArgument(checksumBytes.length % 4 == 0,
final int checksumSize = stripeChecksum.size();
Preconditions.checkArgument(checksumSize % 4 == 0,
"Checksum Bytes size does not match");

ByteBuffer byteWrap = ByteBuffer
.wrap(checksumBytes, 0, checksumBytes.length - parityBytes);
byte[] currentChecksum = new byte[4];

while (byteWrap.hasRemaining()) {
byteWrap.get(currentChecksum);
out.write(currentChecksum);
}
final ByteBuffer byteWrap = stripeChecksum.asReadOnlyByteBuffer();
byteWrap.limit(checksumSize - parityBytes);
digester.update(byteWrap);
}

MD5Hash fileMD5 = MD5Hash.digest(out.toByteArray());
setOutBytes(fileMD5.getDigest());
final byte[] fileMD5 = digester.digest();
setOutBytes(digester.digest());

LOG.debug("Number of chunks={}, md5hash={}",
chunkInfoList.size(), fileMD5);
if (LOG.isDebugEnabled()) {
LOG.debug("Number of chunks={}, md5hash={}",
chunkInfoList.size(), StringUtils.bytes2HexString(fileMD5));
}
}

private void computeCompositeCrc() throws IOException {
Expand Down Expand Up @@ -149,17 +146,15 @@ private void computeCompositeCrc() throws IOException {
ByteString stripeChecksum = chunkInfo.getStripeChecksum();

Preconditions.checkNotNull(stripeChecksum);
byte[] checksumBytes = stripeChecksum.toByteArray();

Preconditions.checkArgument(checksumBytes.length % 4 == 0,
final int checksumSize = stripeChecksum.size();
Preconditions.checkArgument(checksumSize % 4 == 0,
"Checksum Bytes size does not match");
CrcComposer chunkCrcComposer =
CrcComposer.newCrcComposer(dataChecksumType, bytesPerCrc);

// Limit parity bytes as they do not contribute to fileChecksum
ByteBuffer byteWrap = ByteBuffer
.wrap(checksumBytes, 0, checksumBytes.length - parityBytes);
byte[] currentChecksum = new byte[4];
final ByteBuffer byteWrap = stripeChecksum.asReadOnlyByteBuffer();
byteWrap.limit(checksumSize - parityBytes);

long chunkOffsetIndex = 1;
while (byteWrap.hasRemaining()) {
Expand All @@ -177,8 +172,7 @@ private void computeCompositeCrc() throws IOException {
currentChunkOffset = bytesPerCrcOffset;
}

byteWrap.get(currentChecksum);
int checksumDataCrc = CrcUtil.readInt(currentChecksum, 0);
final int checksumDataCrc = byteWrap.getInt();
//To handle last chunk when it size is lower than 1524K in the case
// of rs-3-2-1524k.
long chunkSizePerChecksum = Math.min(Math.min(keySize, bytesPerCrc),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.List;

/**
Expand All @@ -39,7 +40,13 @@ public class ReplicatedBlockChecksumComputer extends
private static final Logger LOG =
LoggerFactory.getLogger(ReplicatedBlockChecksumComputer.class);

private List<ContainerProtos.ChunkInfo> chunkInfoList;
static MD5Hash digest(ByteBuffer data) {
final MessageDigest digester = MD5Hash.getDigester();
digester.update(data);
return new MD5Hash(digester.digest());
}

private final List<ContainerProtos.ChunkInfo> chunkInfoList;

public ReplicatedBlockChecksumComputer(
List<ContainerProtos.ChunkInfo> chunkInfoList) {
Expand All @@ -62,20 +69,20 @@ public void compute(OzoneClientConfig.ChecksumCombineMode combineMode)
}

// compute the block checksum, which is the md5 of chunk checksums
private void computeMd5Crc() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();

private void computeMd5Crc() {
ByteString bytes = ByteString.EMPTY;
for (ContainerProtos.ChunkInfo chunkInfo : chunkInfoList) {
ContainerProtos.ChecksumData checksumData =
chunkInfo.getChecksumData();
List<ByteString> checksums = checksumData.getChecksumsList();

for (ByteString checksum : checksums) {
baos.write(checksum.toByteArray());
bytes = bytes.concat(checksum);
}
}

MD5Hash fileMD5 = MD5Hash.digest(baos.toByteArray());
final MD5Hash fileMD5 = digest(bytes.asReadOnlyByteBuffer());

setOutBytes(fileMD5.getDigest());

LOG.debug("number of chunks={}, md5out={}",
Expand Down Expand Up @@ -121,7 +128,7 @@ private void computeCompositeCrc() throws IOException {
Preconditions.checkArgument(remainingChunkSize <=
checksums.size() * chunkSize);
for (ByteString checksum : checksums) {
int checksumDataCrc = CrcUtil.readInt(checksum.toByteArray(), 0);
final int checksumDataCrc = checksum.asReadOnlyByteBuffer().getInt();
chunkCrcComposer.update(checksumDataCrc,
Math.min(bytesPerCrc, remainingChunkSize));
remainingChunkSize -= bytesPerCrc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private ChecksumData computeChecksum(ContainerCommandResponseProto response)
throws OzoneChecksumException {
ContainerProtos.ReadChunkResponseProto readChunk = response.getReadChunk();
if (readChunk.hasData()) {
return checksum.computeChecksum(readChunk.getData().toByteArray());
return checksum.computeChecksum(readChunk.getData().asReadOnlyByteBuffer());
} else {
return checksum.computeChecksum(
readChunk.getDataBuffers().getBuffersList());
Expand Down