diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECBlockChecksumComputer.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECBlockChecksumComputer.java index b2c30ed9e08f..a4c24768cddc 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECBlockChecksumComputer.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECBlockChecksumComputer.java @@ -45,12 +45,14 @@ public class ECBlockChecksumComputer extends AbstractBlockChecksumComputer { private final List chunkInfoList; private final OmKeyInfo keyInfo; + private final long blockLength; public ECBlockChecksumComputer( - List chunkInfoList, OmKeyInfo keyInfo) { + List chunkInfoList, OmKeyInfo keyInfo, long blockLength) { this.chunkInfoList = chunkInfoList; this.keyInfo = keyInfo; + this.blockLength = blockLength; } @Override @@ -72,15 +74,13 @@ public void compute(OzoneClientConfig.ChecksumCombineMode combineMode) private void computeMd5Crc() { Preconditions.checkArgument(chunkInfoList.size() > 0); - final ContainerProtos.ChunkInfo firstChunkInfo = chunkInfoList.get(0); - long chunkSize = firstChunkInfo.getLen(); - long bytesPerCrc = firstChunkInfo.getChecksumData().getBytesPerChecksum(); - // Total parity checksum bytes per stripe to remove - int parityBytes = getParityBytes(chunkSize, bytesPerCrc); - final MessageDigest digester = MD5Hash.getDigester(); for (ContainerProtos.ChunkInfo chunkInfo : chunkInfoList) { + long chunkSize = chunkInfo.getLen(); + long bytesPerCrc = chunkInfo.getChecksumData().getBytesPerChecksum(); + // Total parity checksum bytes per stripe to remove + int parityBytes = getParityBytes(chunkSize, bytesPerCrc); ByteString stripeChecksum = chunkInfo.getStripeChecksum(); Preconditions.checkNotNull(stripeChecksum); @@ -121,66 +121,40 @@ private void computeCompositeCrc() throws IOException { // Bytes required to create a CRC long bytesPerCrc = firstChunkInfo.getChecksumData().getBytesPerChecksum(); - long chunkSize = firstChunkInfo.getLen(); - - //When EC chunk size is not a multiple of ozone.client.bytes.per.checksum - // (default = 16KB) the last checksum in an EC chunk is only generated for - // offset. - long bytesPerCrcOffset = chunkSize % bytesPerCrc; - - long keySize = keyInfo.getDataSize(); - // Total parity checksum bytes per stripe to remove - int parityBytes = getParityBytes(chunkSize, bytesPerCrc); - - // Number of checksum per chunk, Eg: 2MB EC chunk will - // have 2 checksum per chunk. - int numChecksumPerChunk = (int) - (Math.ceil((double) chunkSize / bytesPerCrc)); + long blockSize = blockLength; CrcComposer blockCrcComposer = CrcComposer.newCrcComposer(dataChecksumType, bytesPerCrc); for (ContainerProtos.ChunkInfo chunkInfo : chunkInfoList) { ByteString stripeChecksum = chunkInfo.getStripeChecksum(); + long chunkSize = chunkInfo.getLen(); + + // Total parity checksum bytes per stripe to remove + int parityBytes = getParityBytes(chunkSize, bytesPerCrc); Preconditions.checkNotNull(stripeChecksum); final int checksumSize = stripeChecksum.size(); Preconditions.checkArgument(checksumSize % 4 == 0, "Checksum Bytes size does not match"); - CrcComposer chunkCrcComposer = - CrcComposer.newCrcComposer(dataChecksumType, bytesPerCrc); // Limit parity bytes as they do not contribute to fileChecksum final ByteBuffer byteWrap = stripeChecksum.asReadOnlyByteBuffer(); byteWrap.limit(checksumSize - parityBytes); - long chunkOffsetIndex = 1; while (byteWrap.hasRemaining()) { - - /* - When chunk size is not a multiple of bytes.per.crc we get an offset. - For eg, RS-3-2-1524k is not a multiple of 1MB. So two checksums are - generated 1st checksum for 1024k bytes and 2nd checksum for 500k bytes. - When we reach the 2nd Checksum we need to modify the bytesPerCrc as in - this case 500k is the bytes for which the checksum is generated. - */ - long currentChunkOffset = Long.MAX_VALUE; - if ((chunkOffsetIndex % numChecksumPerChunk == 0) - && (bytesPerCrcOffset > 0)) { - currentChunkOffset = bytesPerCrcOffset; + // Here Math.min in mainly required for last stripe's last chunk. The last chunk of the last stripe can be + // less than the chunkSize, chunkSize is only calculated from each stripe's first chunk. This would be fine + // for rest of the stripe because all the chunks are of the same size. But for the last stripe we don't know + // the exact size of the last chunk. So we calculate it with the of blockSize. If the block size is smaller + // than the chunk size, then we know it is the last stripe' last chunk. + long remainingChunkSize = Math.min(blockSize, chunkSize); + while (byteWrap.hasRemaining() && remainingChunkSize > 0) { + final int checksumData = byteWrap.getInt(); + blockCrcComposer.update(checksumData, Math.min(bytesPerCrc, remainingChunkSize)); + remainingChunkSize -= bytesPerCrc; } - - final int checksumDataCrc = byteWrap.getInt(); - //To handle last chunk when it size is lower than 1524K in the case - // of rs-3-2-1524k. - long chunkSizePerChecksum = Math.min(Math.min(keySize, bytesPerCrc), - currentChunkOffset); - chunkCrcComposer.update(checksumDataCrc, chunkSizePerChecksum); - - int chunkChecksumCrc = CrcUtil.readInt(chunkCrcComposer.digest(), 0); - blockCrcComposer.update(chunkChecksumCrc, chunkSizePerChecksum); - keySize -= Math.min(bytesPerCrc, currentChunkOffset); - ++chunkOffsetIndex; + blockSize -= chunkSize; } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java index 13ba57169878..58a97b2a90d4 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java @@ -102,7 +102,7 @@ private boolean checksumBlock(OmKeyLocationInfo keyLocationInfo) setBytesPerCRC(bytesPerChecksum); ByteBuffer blockChecksumByteBuffer = - getBlockChecksumFromChunkChecksums(chunkInfos); + getBlockChecksumFromChunkChecksums(chunkInfos, keyLocationInfo.getLength()); String blockChecksumForDebug = populateBlockChecksumBuf(blockChecksumByteBuffer); @@ -140,10 +140,11 @@ private String populateBlockChecksumBuf( } private ByteBuffer getBlockChecksumFromChunkChecksums( - List chunkInfos) throws IOException { + List chunkInfos, + long blockLength) throws IOException { AbstractBlockChecksumComputer blockChecksumComputer = - new ECBlockChecksumComputer(chunkInfos, getKeyInfo()); + new ECBlockChecksumComputer(chunkInfos, getKeyInfo(), blockLength); blockChecksumComputer.compute(getCombineMode()); return blockChecksumComputer.getOutByteBuffer(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileChecksum.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileChecksum.java index 649ed50a1020..7b5a95808050 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileChecksum.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileChecksum.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.ozone; import com.google.common.collect.ImmutableList; +import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileSystem; @@ -39,6 +40,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; @@ -53,10 +55,13 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; import static org.apache.hadoop.ozone.TestDataUtil.createBucket; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.params.provider.Arguments.arguments; /** * Test FileChecksum API. @@ -68,10 +73,16 @@ public class TestOzoneFileChecksum { true, false }; - private static final int[] DATA_SIZES = DoubleStream.of(0.5, 1, 1.5, 2, 7, 8) - .mapToInt(mb -> (int) (1024 * 1024 * mb)) + private static final int[] DATA_SIZES_1 = DoubleStream.of(0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 5, 6, 7, 8, 9, 10) + .mapToInt(mb -> (int) (1024 * 1024 * mb) + 510000) .toArray(); + private static final int[] DATA_SIZES_2 = DoubleStream.of(0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 5, 6, 7, 8, 9, 10) + .mapToInt(mb -> (int) (1024 * 1024 * mb) + 820000) + .toArray(); + + private int[] dataSizes = new int[DATA_SIZES_1.length + DATA_SIZES_2.length]; + private OzoneConfiguration conf; private MiniOzoneCluster cluster = null; private FileSystem fs; @@ -84,6 +95,8 @@ public class TestOzoneFileChecksum { void setup() throws IOException, InterruptedException, TimeoutException { conf = new OzoneConfiguration(); + conf.setStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 1024 * 1024, StorageUnit.BYTES); + conf.setStorageSize(OZONE_SCM_BLOCK_SIZE, 2 * 1024 * 1024, StorageUnit.BYTES); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(5) .build(); @@ -95,9 +108,8 @@ void setup() throws IOException, OzoneConsts.OZONE_OFS_URI_SCHEME); conf.setBoolean(disableCache, true); conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); - fs = FileSystem.get(conf); - ofs = (RootedOzoneFileSystem) fs; - adapter = (BasicRootedOzoneClientAdapterImpl) ofs.getAdapter(); + System.arraycopy(DATA_SIZES_1, 0, dataSizes, 0, DATA_SIZES_1.length); + System.arraycopy(DATA_SIZES_2, 0, dataSizes, DATA_SIZES_1.length, DATA_SIZES_2.length); } @AfterEach @@ -112,9 +124,13 @@ void teardown() { * Test EC checksum with Replicated checksum. */ @ParameterizedTest - @MethodSource("missingIndexes") - void testEcFileChecksum(List missingIndexes) throws IOException { + @MethodSource("missingIndexesAndChecksumSize") + void testEcFileChecksum(List missingIndexes, double checksumSizeInMB) throws IOException { + conf.setInt("ozone.client.bytes.per.checksum", (int) (checksumSizeInMB * 1024 * 1024)); + fs = FileSystem.get(conf); + ofs = (RootedOzoneFileSystem) fs; + adapter = (BasicRootedOzoneClientAdapterImpl) ofs.getAdapter(); String volumeName = UUID.randomUUID().toString(); String legacyBucket = UUID.randomUUID().toString(); String ecBucketName = UUID.randomUUID().toString(); @@ -139,7 +155,7 @@ void testEcFileChecksum(List missingIndexes) throws IOException { Map replicatedChecksums = new HashMap<>(); - for (int dataLen : DATA_SIZES) { + for (int dataLen : dataSizes) { byte[] data = randomAlphabetic(dataLen).getBytes(UTF_8); try (OutputStream file = adapter.createFile(volumeName + "/" @@ -170,7 +186,7 @@ void testEcFileChecksum(List missingIndexes) throws IOException { clientConf.setBoolean(OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, topologyAware); try (FileSystem fsForRead = FileSystem.get(clientConf)) { - for (int dataLen : DATA_SIZES) { + for (int dataLen : dataSizes) { // Compute checksum after failed DNs Path parent = new Path("/" + volumeName + "/" + ecBucketName + "/"); Path ecKey = new Path(parent, "test" + dataLen); @@ -187,14 +203,13 @@ void testEcFileChecksum(List missingIndexes) throws IOException { } } - static Stream> missingIndexes() { + static Stream missingIndexesAndChecksumSize() { return Stream.of( - ImmutableList.of(0, 1), - ImmutableList.of(1, 2), - ImmutableList.of(2, 3), - ImmutableList.of(3, 4), - ImmutableList.of(0, 3), - ImmutableList.of(0, 4) - ); + arguments(ImmutableList.of(0, 1), 0.001), + arguments(ImmutableList.of(1, 2), 0.01), + arguments(ImmutableList.of(2, 3), 0.1), + arguments(ImmutableList.of(3, 4), 0.5), + arguments(ImmutableList.of(0, 3), 1), + arguments(ImmutableList.of(0, 4), 2)); } }