Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ public enum ChecksumCombineMode {
tags = ConfigTag.CLIENT)
private int ecReconstructStripeWritePoolLimit = 10 * 3;

@Config(key="ec.reconstruction.validation",
defaultValue = "false",
description = "Flag to enable validation for EC reconstruction tasks" +
" to reconstruct target containers correctly. Reconstruction tasks" +
" will fail if validation fails when enabled.",
tags = ConfigTag.CLIENT)
private boolean ecReconstructionValidation = false;

@Config(key = "checksum.combine.mode",
defaultValue = "COMPOSITE_CRC",
description = "The combined checksum type [MD5MD5CRC / COMPOSITE_CRC] "
Expand Down Expand Up @@ -509,6 +517,14 @@ public int getEcReconstructStripeWritePoolLimit() {
return ecReconstructStripeWritePoolLimit;
}

public void setEcReconstructionValidation(boolean validationEnabled) {
this.ecReconstructionValidation = validationEnabled;
}

public boolean getEcReconstructionValidation() {
return ecReconstructionValidation;
}

public void setFsDefaultBucketLayout(String bucketLayout) {
if (!bucketLayout.isEmpty()) {
this.fsDefaultBucketLayout = bucketLayout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,5 +825,4 @@ private static SortedSet<Integer> setOfRange(
return range(startInclusive, endExclusive)
.boxed().collect(toCollection(TreeSet::new));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.security.token.Token;
Expand Down Expand Up @@ -115,6 +116,7 @@ public class ECReconstructionCoordinator implements Closeable {
private final ECReconstructionMetrics metrics;
private final StateContext context;
private final OzoneClientConfig ozoneClientConfig;
private final ECValidator ecValidator;

public ECReconstructionCoordinator(
ConfigurationSource conf, CertificateClient certificateClient,
Expand All @@ -140,6 +142,7 @@ public ECReconstructionCoordinator(
tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient);
this.clientMetrics = ContainerClientMetrics.acquire();
this.metrics = metrics;
ecValidator = new ECValidator(ozoneClientConfig);
}

public void reconstructECContainerGroup(long containerID,
Expand Down Expand Up @@ -267,6 +270,7 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
this.blockInputStreamFactory, byteBufferPool,
this.ecReconstructReadExecutor,
clientConfig)) {
ecValidator.setBlockLength(blockLocationInfo.getLength());

ECBlockOutputStream[] targetBlockStreams =
new ECBlockOutputStream[toReconstructIndexes.size()];
Expand All @@ -291,8 +295,10 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
}

if (toReconstructIndexes.size() > 0) {
sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1))
.collect(Collectors.toSet()));
Set<Integer> recoveryIndexes = toReconstructIndexes.stream().map(i -> (i - 1))
.collect(Collectors.toSet());
sis.setRecoveryIndexes(recoveryIndexes);
ecValidator.setReconstructionIndexes(recoveryIndexes);
long length = safeBlockGroupLength;
while (length > 0) {
int readLen;
Expand Down Expand Up @@ -337,6 +343,7 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
List<ECBlockOutputStream> allStreams = new ArrayList<>(Arrays.asList(targetBlockStreams));
allStreams.addAll(Arrays.asList(emptyBlockStreams));
for (ECBlockOutputStream targetStream : allStreams) {
ecValidator.validateChecksum(targetStream, blockDataGroup);
targetStream.executePutBlock(true, true, blockLocationInfo.getLength(), blockDataGroup);
checkFailures(targetStream, targetStream.getCurrentPutBlkResponseFuture());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package org.apache.hadoop.ozone.container.ec.reconstruction;

import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;

public class ECValidator {

private static final Logger LOG =
LoggerFactory.getLogger(ECValidator.class);
private final boolean isValidationEnabled;
private Collection<Integer> reconstructionIndexes;
private final int parityCount;
private long blockLength;
private final ECReplicationConfig ecReplicationConfig;
private int ecChunkSize;

ECValidator(OzoneClientConfig config, ECReplicationConfig ecReplConfig) {
// We fetch the configuration value beforehand to avoid re-fetching on every validation call
isValidationEnabled = config.getEcReconstructionValidation();
ecReplicationConfig = ecReplConfig;
parityCount = ecReplConfig.getParity();
ecChunkSize = ecReplConfig.getEcChunkSize();
}

public void setReconstructionIndexes(Collection<Integer> reconstructionIndexes) {
this.reconstructionIndexes = reconstructionIndexes;
}

public void setBlockLength(long blockLength) {
this.blockLength = blockLength;
}

/**
* Validate the expected checksum data for a chunk with the corresponding checksum in original stripe checksum
* Note: The stripe checksum is a combination of all the checksums of all the chunks in the stripe
* @param recreatedChunkChecksum Stores the {@link ContainerProtos.ChecksumData} of the recreated chunk to verify
* @param stripeChecksum Stores the {@link ByteBuffer} of stripe checksum
* @param chunkIndex Stores the index of the recreated chunk we are comparing
* @param checksumSize Stores the length of the stripe checksum
* @throws OzoneChecksumException If there is a mismatch in the recreated chunk vs stripe checksum, or if there is any
* internal error while performing {@link ByteBuffer} operations
*/
private void validateChecksumInStripe(ContainerProtos.ChecksumData recreatedChunkChecksum,
ByteBuffer stripeChecksum, int chunkIndex, int checksumSize)
throws OzoneChecksumException {

int bytesPerChecksum = recreatedChunkChecksum.getBytesPerChecksum();
int parityLength = (int) (Math.ceil((double)ecChunkSize / bytesPerChecksum) * 4L * parityCount);
// Ignore the parity bits
stripeChecksum.limit(checksumSize - parityLength);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we limiting due to parity? We could be reconstructing a parity index, and it should have checksum too. Or, does the stripe checksum not contain the parity checksums? I cannot remember how this was designed, but if you are reducing the effective stripeChecksum length to remove parity, then parity is likely included in the stripechecksum.


// If we have a 100 bytes per checksum, and a chunk of size 1000 bytes, it means there are total 10 checksums
// for each chunk that is present. So the 1st chunk will have 10 checksums together to form a single chunk checksum.
// For each chunk we will have:
// Checksum of length = (chunkIdx * numOfChecksumPerChunk)
// Number of Checksums per Chunk = (chunkSize / bytesPerChecksum)
// So the checksum should start from (numOfBytesPerChecksum * (chunkIdx * numOfChecksumPerChunk)

int checksumIdxStart = (ecChunkSize * chunkIndex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not align with the comment above, as we are not considering numOfBytesPerChecksum or numOfChecksumPerChunk ?

Also I am not sure about the above calculation.

What if the bytes per checksum is 100, and the chunksize is 1000, but only 80 bytes were written? I that case, we would expect a stripe (for EC-3-2) that looks like:

Index_1: 80 bytes of data, 4 bytes of checksum.
Index_2: 0 bytes
Index_3: 0 bytes
Index_4: 80 bytes of data, 4 bytes of checksum.
Index_5: 80 bytes of data, 4 bytes of checksum.

Similar, if you have 1080 bytes written, then index 1 and 2 will have data, but index 2 has shorter data and a shorter checksum. The logic is different (and simpler) for a full stripe than a partial stripe.


stripeChecksum.position(checksumIdxStart);
ByteBuffer chunkChecksum = recreatedChunkChecksum.getChecksums(0).asReadOnlyByteBuffer();
while (chunkChecksum.hasRemaining()) {
try {
int recreatedChunkChecksumByte = chunkChecksum.getInt();
int expectedStripeChecksumByte = stripeChecksum.getInt();
if (recreatedChunkChecksumByte != expectedStripeChecksumByte) {
throw new OzoneChecksumException(
String.format("Mismatch in checksum for recreated data: %s and existing stripe checksum: %s",
recreatedChunkChecksumByte, expectedStripeChecksumByte));
}
} catch (BufferUnderflowException bue) {
throw new OzoneChecksumException(
String.format("No more data to fetch from the stripe checksum at position: %s",
stripeChecksum.position()));
}
}
}

/**
* Get the block from the BlockData which contains the checksum information
* @param blockDataGroup An array of {@link BlockData} which contains all the blocks in a Datanode
* @return The block which contains the checksum information
*/
private BlockData getChecksumBlockData(BlockData[] blockDataGroup) {
BlockData checksumBlockData = null;
// Reverse traversal as all parity bits will have checksumBytes
for (int i = blockDataGroup.length - 1; i >= 0; i--) {
BlockData blockData = blockDataGroup[i];
if (null == blockData) {
continue;
}

List<ContainerProtos.ChunkInfo> chunks = blockData.getChunks();
if (null != chunks && !(chunks.isEmpty())) {
if (chunks.get(0).hasStripeChecksum()) {
checksumBlockData = blockData;
break;
}
}
}

return checksumBlockData;
}

/**
* Helper function to validate the checksum between recreated data and
* @param ecBlockOutputStream A {@link ECBlockOutputStream} instance that stores
* the reconstructed index ECBlockOutputStream
* @throws OzoneChecksumException if the recreated checksum and the block checksum doesn't match
*/
public void validateChecksum(ECBlockOutputStream ecBlockOutputStream, BlockData[] blockDataGroup)
throws OzoneChecksumException{
if (isValidationEnabled) {

//Checksum will be stored in the 1st chunk and parity chunks
List<ContainerProtos.ChunkInfo> recreatedChunks = ecBlockOutputStream.getContainerBlockData().getChunksList();
BlockData checksumBlockData = getChecksumBlockData(blockDataGroup);
if (null == checksumBlockData) {
throw new OzoneChecksumException("Could not find checksum data in any index for blockDataGroup while validating");
}
List<ContainerProtos.ChunkInfo> checksumBlockChunks = checksumBlockData.getChunks();

for (int chunkIdx = 0; chunkIdx < recreatedChunks.size(); chunkIdx++) {
ByteString stripeChecksum = checksumBlockChunks.get(chunkIdx).getStripeChecksum();
validateChecksumInStripe(
recreatedChunks.get(chunkIdx).getChecksumData(),
stripeChecksum.asReadOnlyByteBuffer(), stripeChecksum.size(), chunkIdx
);
}
} else {
LOG.debug("Checksum validation was disabled, skipping check");
}
}
}
Loading