diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/BaseFileChecksumHelper.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/BaseFileChecksumHelper.java index b535f53b69a4..97058db3dc11 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/BaseFileChecksumHelper.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/BaseFileChecksumHelper.java @@ -147,16 +147,41 @@ private void fetchBlocks() throws IOException { .build(); OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs); + if (keyInfo.getFileChecksum() != null && + isFullLength(keyInfo.getDataSize())) { + // if the checksum is cached in OM, and we request the checksum of + // the full length. + fileChecksum = keyInfo.getFileChecksum(); + } + // use OmKeyArgs to call Om.lookup() and get OmKeyInfo keyLocationInfos = keyInfo .getLatestVersionLocations().getBlocksLatestVersionOnly(); } + /** + * Return true if the requested length is longer than the file length + * (dataSize). + * + * @param dataSize file length + * @return + */ + private boolean isFullLength(long dataSize) { + return this.length >= dataSize; + } + /** * Compute file checksum given the list of chunk checksums requested earlier. + * + * Skip computation if the already computed, or if the OmKeyInfo of the key + * in OM has pre-computed checksum. * @throws IOException */ public void compute() throws IOException { + if (fileChecksum != null) { + LOG.debug("Checksum is available. Skip computing it."); + return; + } /** * request length is 0 or the file is empty, return one with the * magic entry that matches the md5 of a 32 byte zero-padded byte array. @@ -167,7 +192,7 @@ public void compute() throws IOException { final int lenOfZeroBytes = 32; byte[] emptyBlockMd5 = new byte[lenOfZeroBytes]; MD5Hash fileMD5 = MD5Hash.digest(emptyBlockMd5); - fileChecksum = new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5); + fileChecksum = new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5); } else { checksumBlocks(); fileChecksum = makeFinalResult(); diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestReplicatedFileChecksumHelper.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestReplicatedFileChecksumHelper.java index f5beaa14ad5a..589936bc423c 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestReplicatedFileChecksumHelper.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestReplicatedFileChecksumHelper.java @@ -241,6 +241,36 @@ public XceiverClientReply sendCommandAsync( FileChecksum fileChecksum = helper.getFileChecksum(); assertTrue(fileChecksum instanceof MD5MD5CRC32GzipFileChecksum); assertEquals(1, helper.getKeyLocationInfoList().size()); + + FileChecksum cachedChecksum = new MD5MD5CRC32GzipFileChecksum(); + /// test cached checksum + OmKeyInfo omKeyInfoWithChecksum = new OmKeyInfo.Builder() + .setVolumeName(null) + .setBucketName(null) + .setKeyName(null) + .setOmKeyLocationInfos(Collections.singletonList( + new OmKeyLocationInfoGroup(0, omKeyLocationInfoList))) + .setCreationTime(Time.now()) + .setModificationTime(Time.now()) + .setDataSize(0) + .setReplicationConfig( + RatisReplicationConfig.getInstance( + HddsProtos.ReplicationFactor.ONE)) + .setFileEncryptionInfo(null) + .setAcls(null) + .setFileChecksum(cachedChecksum) + .build(); + when(om.lookupKey(ArgumentMatchers.any())). + thenReturn(omKeyInfoWithChecksum); + + helper = new ReplicatedFileChecksumHelper( + mockVolume, bucket, "dummy", 10, combineMode, + mockRpcClient); + + helper.compute(); + fileChecksum = helper.getFileChecksum(); + assertTrue(fileChecksum instanceof MD5MD5CRC32GzipFileChecksum); + assertEquals(1, helper.getKeyLocationInfoList().size()); } private XceiverClientReply buildValidResponse() { diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java index 4b2727642a4f..9b2014dd7b84 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java @@ -26,6 +26,7 @@ import java.util.Objects; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ContainerBlockID; @@ -33,6 +34,7 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.FileChecksumProto; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocationList; import org.apache.hadoop.ozone.protocolPB.OMPBHelper; @@ -58,6 +60,7 @@ public final class OmKeyInfo extends WithParentObjectId { private long modificationTime; private ReplicationConfig replicationConfig; private FileEncryptionInfo encInfo; + private FileChecksum fileChecksum; /** * Represents leaf node name. This also will be used when the keyName is @@ -78,7 +81,7 @@ public final class OmKeyInfo extends WithParentObjectId { ReplicationConfig replicationConfig, Map metadata, FileEncryptionInfo encInfo, List acls, - long objectID, long updateID) { + long objectID, long updateID, FileChecksum fileChecksum) { this.volumeName = volumeName; this.bucketName = bucketName; this.keyName = keyName; @@ -92,6 +95,7 @@ public final class OmKeyInfo extends WithParentObjectId { this.acls = acls; this.objectID = objectID; this.updateID = updateID; + this.fileChecksum = fileChecksum; } @SuppressWarnings("parameternumber") @@ -101,10 +105,11 @@ public final class OmKeyInfo extends WithParentObjectId { ReplicationConfig replicationConfig, Map metadata, FileEncryptionInfo encInfo, List acls, - long parentObjectID, long objectID, long updateID) { + long parentObjectID, long objectID, long updateID, + FileChecksum fileChecksum) { this(volumeName, bucketName, keyName, versions, dataSize, creationTime, modificationTime, replicationConfig, metadata, - encInfo, acls, objectID, updateID); + encInfo, acls, objectID, updateID, fileChecksum); this.fileName = fileName; this.parentObjectID = parentObjectID; } @@ -355,6 +360,10 @@ public void setReplicationConfig(ReplicationConfig repConfig) { this.replicationConfig = repConfig; } + public FileChecksum getFileChecksum() { + return fileChecksum; + } + /** * Builder of OmKeyInfo. */ @@ -376,6 +385,7 @@ public static class Builder { // not persisted to DB. FileName will be the last element in path keyName. private String fileName; private long parentObjectID; + private FileChecksum fileChecksum; public Builder() { this.metadata = new HashMap<>(); @@ -483,12 +493,17 @@ public Builder setParentObjectID(long parentID) { return this; } + public Builder setFileChecksum(FileChecksum checksum) { + this.fileChecksum = checksum; + return this; + } + public OmKeyInfo build() { return new OmKeyInfo( volumeName, bucketName, keyName, fileName, omKeyLocationInfoGroups, dataSize, creationTime, modificationTime, replicationConfig, metadata, encInfo, acls, - parentObjectID, objectID, updateID); + parentObjectID, objectID, updateID, fileChecksum); } } @@ -577,6 +592,11 @@ private KeyInfo getProtobuf(boolean ignorePipeline, String fullKeyName, .setObjectID(objectID) .setUpdateID(updateID) .setParentID(parentObjectID); + + FileChecksumProto fileChecksumProto = OMPBHelper.convert(fileChecksum); + if (fileChecksumProto != null) { + kb.setFileChecksum(fileChecksumProto); + } if (StringUtils.isNotBlank(fullKeyName)) { kb.setKeyName(fullKeyName); } else { @@ -588,7 +608,7 @@ private KeyInfo getProtobuf(boolean ignorePipeline, String fullKeyName, return kb.build(); } - public static OmKeyInfo getFromProtobuf(KeyInfo keyInfo) { + public static OmKeyInfo getFromProtobuf(KeyInfo keyInfo) throws IOException { if (keyInfo == null) { return null; } @@ -623,6 +643,10 @@ public static OmKeyInfo getFromProtobuf(KeyInfo keyInfo) { if (keyInfo.hasParentID()) { builder.setParentObjectID(keyInfo.getParentID()); } + if (keyInfo.hasFileChecksum()) { + FileChecksum fileChecksum = OMPBHelper.convert(keyInfo.getFileChecksum()); + builder.setFileChecksum(fileChecksum); + } // not persisted to DB. FileName will be filtered out from keyName builder.setFileName(OzoneFSUtils.getFileName(keyInfo.getKeyName())); return builder.build(); @@ -638,7 +662,8 @@ public String getObjectInfo() { ", creationTime='" + creationTime + '\'' + ", objectID='" + objectID + '\'' + ", parentID='" + parentObjectID + '\'' + - ", replication='" + replicationConfig + + ", replication='" + replicationConfig + '\'' + + ", fileChecksum='" + fileChecksum + '}'; } @@ -704,6 +729,10 @@ public OmKeyInfo copyObject() { metadata.forEach((k, v) -> builder.addMetadata(k, v)); } + if (fileChecksum != null) { + builder.setFileChecksum(fileChecksum); + } + return builder.build(); } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFileStatus.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFileStatus.java index cbb7f9a7d0f5..dcb7e2a2c1e3 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFileStatus.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFileStatus.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.om.helpers; +import java.io.IOException; import java.util.Objects; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneFileStatusProto; @@ -104,7 +105,8 @@ public OzoneFileStatusProto getProtobuf(int clientVersion) { return builder.build(); } - public static OzoneFileStatus getFromProtobuf(OzoneFileStatusProto status) { + public static OzoneFileStatus getFromProtobuf(OzoneFileStatusProto status) + throws IOException { return new OzoneFileStatus( OmKeyInfo.getFromProtobuf(status.getKeyInfo()), status.getBlockSize(), diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java index 83a7184123ad..ac58d609a9cd 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.ozone.om.helpers; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -52,7 +53,7 @@ public List getOmKeyInfoList() { } public static RepeatedOmKeyInfo getFromProto(RepeatedKeyInfo - repeatedKeyInfo) { + repeatedKeyInfo) throws IOException { List list = new ArrayList<>(); for (KeyInfo k : repeatedKeyInfo.getKeyInfoList()) { list.add(OmKeyInfo.getFromProtobuf(k)); diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index 10ca155b13a4..70d46ee3ab53 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -59,6 +59,7 @@ import org.apache.hadoop.ozone.om.helpers.ServiceInfo; import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx; import org.apache.hadoop.ozone.om.protocol.S3Auth; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest; @@ -910,10 +911,12 @@ public List listKeys(String volumeName, String bucketName, ListKeysResponse resp = handleError(submitRequest(omRequest)).getListKeysResponse(); - keys.addAll( - resp.getKeyInfoList().stream() - .map(OmKeyInfo::getFromProtobuf) - .collect(Collectors.toList())); + List list = new ArrayList<>(); + for (OzoneManagerProtocolProtos.KeyInfo keyInfo : resp.getKeyInfoList()) { + OmKeyInfo fromProtobuf = OmKeyInfo.getFromProtobuf(keyInfo); + list.add(fromProtobuf); + } + keys.addAll(list); return keys; } @@ -1637,10 +1640,14 @@ public List listTrash(String volumeName, List deletedKeyList = new ArrayList<>(trashResponse.getDeletedKeysCount()); - deletedKeyList.addAll( - trashResponse.getDeletedKeysList().stream() - .map(RepeatedOmKeyInfo::getFromProto) - .collect(Collectors.toList())); + List list = new ArrayList<>(); + for (OzoneManagerProtocolProtos.RepeatedKeyInfo + repeatedKeyInfo : trashResponse.getDeletedKeysList()) { + RepeatedOmKeyInfo fromProto = + RepeatedOmKeyInfo.getFromProto(repeatedKeyInfo); + list.add(fromProto); + } + deletedKeyList.addAll(list); return deletedKeyList; } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java index a7f13073fdd7..78d3d46782db 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java @@ -20,30 +20,52 @@ import com.google.protobuf.ByteString; import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.fs.CompositeCrcFileChecksum; +import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum; +import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; +import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum; +import org.apache.hadoop.fs.Options; import org.apache.hadoop.hdds.client.DefaultReplicationConfig; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.Text; import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketEncryptionInfoProto; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ChecksumTypeProto; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CipherSuiteProto; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CompositeCrcFileChecksumProto; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CryptoProtocolVersionProto; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.FileChecksumProto; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.FileChecksumTypeProto; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.FileEncryptionInfoProto; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MD5MD5Crc32FileChecksumProto; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; import org.apache.hadoop.ozone.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.CrcUtil; +import org.apache.hadoop.util.DataChecksum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; /** * Utilities for converting protobuf classes. */ public final class OMPBHelper { - + private static final Logger LOG = LoggerFactory.getLogger(OMPBHelper.class); public static final ByteString REDACTED = ByteString.copyFromUtf8(""); @@ -180,12 +202,12 @@ public static HddsProtos.DefaultReplicationConfig convert( final HddsProtos.DefaultReplicationConfig.Builder builder = HddsProtos.DefaultReplicationConfig.newBuilder(); - builder.setType( - ReplicationType.toProto(defaultReplicationConfig.getType())); + builder.setType(ReplicationType.toProto( + defaultReplicationConfig.getType())); if (defaultReplicationConfig.getFactor() != null) { - builder.setFactor(ReplicationFactor - .toProto(defaultReplicationConfig.getFactor())); + builder.setFactor(ReplicationFactor.toProto( + defaultReplicationConfig.getFactor())); } if (defaultReplicationConfig.getEcReplicationConfig() != null) { @@ -196,6 +218,159 @@ public static HddsProtos.DefaultReplicationConfig convert( return builder.build(); } + public static FileChecksum convert(FileChecksumProto proto) + throws IOException { + if (proto == null) { + return null; + } + + switch (proto.getChecksumType()) { + case MD5CRC: + if (proto.hasMd5Crc()) { + return convertMD5MD5FileChecksum(proto.getMd5Crc()); + } + throw new IOException("The field md5Crc is not set."); + case COMPOSITE_CRC: + if (proto.hasCompositeCrc()) { + return convertCompositeCrcChecksum(proto.getCompositeCrc()); + } + throw new IOException("The field CompositeCrc is not set."); + default: + throw new IOException("Unexpected checksum type" + + proto.getChecksumType()); + } + } + + public static MD5MD5CRC32FileChecksum convertMD5MD5FileChecksum( + MD5MD5Crc32FileChecksumProto proto) throws IOException { + ChecksumTypeProto checksumTypeProto = proto.getChecksumType(); + int bytesPerCRC = proto.getBytesPerCRC(); + long crcPerBlock = proto.getCrcPerBlock(); + ByteString md5 = proto.getMd5(); + DataInputStream inputStream = new DataInputStream( + new ByteArrayInputStream(md5.toByteArray())); + MD5Hash md5Hash = MD5Hash.read(inputStream); + switch (checksumTypeProto) { + case CHECKSUM_CRC32: + return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC, crcPerBlock, md5Hash); + case CHECKSUM_CRC32C: + return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC, crcPerBlock, + md5Hash); + default: + throw new IOException("Unexpected checksum type " + checksumTypeProto); + } + } + + public static CompositeCrcFileChecksum convertCompositeCrcChecksum( + CompositeCrcFileChecksumProto proto) throws IOException { + ChecksumTypeProto checksumTypeProto = proto.getChecksumType(); + int bytesPerCRC = proto.getBytesPerCrc(); + int crc = proto.getCrc(); + switch (checksumTypeProto) { + case CHECKSUM_CRC32: + return new CompositeCrcFileChecksum( + crc, DataChecksum.Type.CRC32, bytesPerCRC); + case CHECKSUM_CRC32C: + return new CompositeCrcFileChecksum( + crc, DataChecksum.Type.CRC32C, bytesPerCRC); + default: + throw new IOException("Unexpected checksum type " + checksumTypeProto); + } + } + + public static MD5MD5Crc32FileChecksumProto convert( + MD5MD5CRC32FileChecksum checksum) + throws IOException { + ChecksumTypeProto type; + switch (checksum.getCrcType()) { + case CRC32: + type = ChecksumTypeProto.CHECKSUM_CRC32; + break; + case CRC32C: + type = ChecksumTypeProto.CHECKSUM_CRC32C; + break; + default: + type = ChecksumTypeProto.CHECKSUM_NULL; + } + + DataOutputBuffer buf = new DataOutputBuffer(); + checksum.write(buf); + byte[] bytes = buf.getData(); + DataInputBuffer buffer = new DataInputBuffer(); + buffer.reset(bytes, 0, bytes.length); + int bytesPerCRC = buffer.readInt(); + long crcPerBlock = buffer.readLong(); + buffer.close(); + + int offset = Integer.BYTES + Long.BYTES; + ByteString byteString = ByteString.copyFrom( + bytes, offset, bytes.length - offset); + + return MD5MD5Crc32FileChecksumProto.newBuilder() + .setChecksumType(type) + .setBytesPerCRC(bytesPerCRC) + .setCrcPerBlock(crcPerBlock) + .setMd5(byteString) + .build(); + } + + public static CompositeCrcFileChecksumProto convert( + CompositeCrcFileChecksum checksum) + throws IOException { + ChecksumTypeProto type; + Options.ChecksumOpt opt = checksum.getChecksumOpt(); + switch (opt.getChecksumType()) { + case CRC32: + type = ChecksumTypeProto.CHECKSUM_CRC32; + break; + case CRC32C: + type = ChecksumTypeProto.CHECKSUM_CRC32C; + break; + default: + type = ChecksumTypeProto.CHECKSUM_NULL; + } + int crc = CrcUtil.readInt(checksum.getBytes(), 0); + return CompositeCrcFileChecksumProto.newBuilder() + .setChecksumType(type) + .setBytesPerCrc(opt.getBytesPerChecksum()) + .setCrc(crc) + .build(); + } + + public static FileChecksumProto convert(FileChecksum checksum) { + if (checksum == null) { + return null; + } + + try { + if (checksum instanceof MD5MD5CRC32FileChecksum) { + MD5MD5Crc32FileChecksumProto c1 = + convert((MD5MD5CRC32FileChecksum) checksum); + + return FileChecksumProto.newBuilder() + .setChecksumType(FileChecksumTypeProto.MD5CRC) + .setMd5Crc(c1) + .build(); + } else if (checksum instanceof CompositeCrcFileChecksum) { + CompositeCrcFileChecksumProto c2 = + convert((CompositeCrcFileChecksum) checksum); + + return FileChecksumProto.newBuilder() + .setChecksumType(FileChecksumTypeProto.COMPOSITE_CRC) + .setCompositeCrc(c2) + .build(); + } else { + LOG.warn("Unsupported file checksum runtime type " + + checksum.getClass().getName()); + } + } catch (IOException ioe) { + LOG.warn( + "Failed to convert a FileChecksum {} to its protobuf representation", + checksum, ioe); + } + return null; + } + public static CipherSuite convert(CipherSuiteProto proto) { switch (proto) { case AES_CTR_NOPADDING: diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java index a6a0ec9917ff..50218eaf9a43 100644 --- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java @@ -34,6 +34,7 @@ import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -50,7 +51,7 @@ public class TestOmKeyInfo { @Test - public void protobufConversion() { + public void protobufConversion() throws IOException { OmKeyInfo key = createOmKeyInfo( RatisReplicationConfig.getInstance(ReplicationFactor.THREE)); @@ -61,7 +62,7 @@ public void protobufConversion() { } @Test - public void getProtobufMessageEC() { + public void getProtobufMessageEC() throws IOException { OmKeyInfo key = createOmKeyInfo( RatisReplicationConfig.getInstance(ReplicationFactor.THREE)); OzoneManagerProtocolProtos.KeyInfo omKeyProto = diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index 1bad21582f5e..4094a2b35264 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -814,6 +814,41 @@ message KeyLocationList { optional bool isMultipartKey = 4 [default = false]; } +/** + * Checksum algorithms/types used in Ozone + * Make sure this enum's integer values match enum values' id properties defined + * in org.apache.hadoop.util.DataChecksum.Type + */ +enum ChecksumTypeProto { + CHECKSUM_NULL = 0; + CHECKSUM_CRC32 = 1; + CHECKSUM_CRC32C = 2; +} + +enum FileChecksumTypeProto { + MD5CRC = 1; // BlockChecksum obtained by taking the MD5 digest of chunk CRCs + COMPOSITE_CRC = 2; // Chunk-independent CRC, optionally striped +} + +message CompositeCrcFileChecksumProto { + required ChecksumTypeProto checksumType = 1; + required uint32 bytesPerCrc = 2; + required uint32 crc = 3; +} + +message MD5MD5Crc32FileChecksumProto { + required ChecksumTypeProto checksumType = 1; + required uint32 bytesPerCRC = 2; + required uint64 crcPerBlock = 3; + required bytes md5 = 4; +} + +message FileChecksumProto { + required FileChecksumTypeProto checksumType = 1 [default = COMPOSITE_CRC]; + optional CompositeCrcFileChecksumProto compositeCrc = 2; + optional MD5MD5Crc32FileChecksumProto md5Crc = 3; +} + message KeyInfo { required string volumeName = 1; required string bucketName = 2; @@ -832,6 +867,7 @@ message KeyInfo { optional uint64 updateID = 15; optional uint64 parentID = 16; optional hadoop.hdds.ECReplicationConfig ecReplicationConfig = 17; + optional FileChecksumProto fileChecksum = 18; } message DirectoryInfo { diff --git a/hadoop-ozone/interface-storage/src/test/java/org/apache/hadoop/ozone/om/codec/TestOmKeyInfoCodec.java b/hadoop-ozone/interface-storage/src/test/java/org/apache/hadoop/ozone/om/codec/TestOmKeyInfoCodec.java index 627906d3d232..96a53ebc4b4f 100644 --- a/hadoop-ozone/interface-storage/src/test/java/org/apache/hadoop/ozone/om/codec/TestOmKeyInfoCodec.java +++ b/hadoop-ozone/interface-storage/src/test/java/org/apache/hadoop/ozone/om/codec/TestOmKeyInfoCodec.java @@ -18,11 +18,14 @@ package org.apache.hadoop.ozone.om.codec; +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; @@ -34,6 +37,7 @@ import java.util.Collections; import java.util.List; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; @@ -47,6 +51,14 @@ public class TestOmKeyInfoCodec { private static final String KEYNAME = "user/root/terasort/10G-input-6/part-m-00037"; + private static FileChecksum checksum = createEmptyChecksum(); + + private static FileChecksum createEmptyChecksum() { + final int lenOfZeroBytes = 32; + byte[] emptyBlockMd5 = new byte[lenOfZeroBytes]; + MD5Hash fileMD5 = MD5Hash.digest(emptyBlockMd5); + return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5); + } private OmKeyInfo getKeyInfo(int chunkNum) { List omKeyLocationInfoList = new ArrayList<>(); @@ -61,6 +73,7 @@ private OmKeyInfo getKeyInfo(int chunkNum) { } OmKeyLocationInfoGroup omKeyLocationInfoGroup = new OmKeyLocationInfoGroup(0, omKeyLocationInfoList); + return new OmKeyInfo.Builder() .setCreationTime(Time.now()) .setModificationTime(Time.now()) @@ -74,6 +87,7 @@ private OmKeyInfo getKeyInfo(int chunkNum) { .setDataSize(100) .setOmKeyLocationInfos( Collections.singletonList(omKeyLocationInfoGroup)) + .setFileChecksum(checksum) .build(); } @@ -95,6 +109,8 @@ public void testOmKeyInfoCodecWithoutPipeline(int chunkNum) { ", Serialized key size without pipeline = " + rawData.length); assertNull(key.getLatestVersionLocations().getLocationList().get(0) .getPipeline()); + assertNotNull(key.getFileChecksum()); + assertEquals(key.getFileChecksum(), checksum); } catch (IOException e) { fail("Should success"); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java index 3f205a8cd85a..5bcf052ab0c4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java @@ -503,8 +503,13 @@ private long getMultipartDataSize(String requestedVolume, OMException.ResultCodes.INVALID_PART); } - OmKeyInfo currentPartKeyInfo = OmKeyInfo - .getFromProtobuf(partKeyInfo.getPartKeyInfo()); + OmKeyInfo currentPartKeyInfo = null; + try { + currentPartKeyInfo = + OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo()); + } catch (IOException ioe) { + throw new OMException(ioe, OMException.ResultCodes.INTERNAL_ERROR); + } // Except for last part all parts should have minimum size. if (currentPartCount != partsListSize) {