Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Collection;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -143,12 +149,133 @@ public void markBlocksAsDeleted(KeyValueContainerData data, Collection<Long> del
}
}

public ContainerDiff diff(KeyValueContainerData thisContainer, ContainerProtos.ContainerChecksumInfo otherInfo)
public ContainerDiff diff(KeyValueContainerData thisContainer, ContainerProtos.ContainerChecksumInfo peerChecksumInfo)
throws IOException {
// TODO HDDS-10928 compare the checksum info of the two containers and return a summary.
// Callers can act on this summary to repair their container replica using the peer's replica.
// This method will use the read lock, which is unused in the current implementation.
return new ContainerDiff();
ContainerDiff report = new ContainerDiff();
try {
captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> {
Preconditions.assertNotNull(thisContainer, "Container data is null");
Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is null");
Optional<ContainerProtos.ContainerChecksumInfo.Builder> thisContainerChecksumInfoBuilder =
read(thisContainer);
if (!thisContainerChecksumInfoBuilder.isPresent()) {
throw new IOException("The container #" + thisContainer.getContainerID() +
" doesn't have container checksum");
}

if (thisContainer.getContainerID() != peerChecksumInfo.getContainerID()) {
throw new IOException("Container Id does not match for container " + thisContainer.getContainerID());
}

ContainerProtos.ContainerChecksumInfo thisChecksumInfo = thisContainerChecksumInfoBuilder.get().build();

ContainerProtos.ContainerMerkleTree thisMerkleTree = thisChecksumInfo.getContainerMerkleTree();
ContainerProtos.ContainerMerkleTree peerMerkleTree = peerChecksumInfo.getContainerMerkleTree();

Set<Long> thisDeletedBlockSet = new HashSet<>(thisChecksumInfo.getDeletedBlocksList());
Set<Long> peerDeletedBlockSet = new HashSet<>(peerChecksumInfo.getDeletedBlocksList());
// Common deleted blocks between two merkle trees;
thisDeletedBlockSet.retainAll(peerDeletedBlockSet);
compareContainerMerkleTree(thisMerkleTree, peerMerkleTree, thisDeletedBlockSet, report);
});
} catch (IOException ex) {
metrics.incrementMerkleTreeDiffFailures();
throw new IOException("Container Diff failed for container #" + thisContainer.getContainerID(), ex);
}

// Update Container Diff metrics based on the diff report.
if (report.isHealthy()) {
metrics.incrementHealthyContainerDiffs();
} else {
metrics.incrementUnhealthyContainerDiffs();
}
metrics.incrementMerkleTreeDiffSuccesses();
return report;
}

private void compareContainerMerkleTree(ContainerProtos.ContainerMerkleTree thisMerkleTree,
ContainerProtos.ContainerMerkleTree peerMerkleTree,
Set<Long> commonDeletedBlockSet,
ContainerDiff report) {

if (thisMerkleTree.getDataChecksum() == peerMerkleTree.getDataChecksum()) {
return;
}

List<ContainerProtos.BlockMerkleTree> thisBlockMerkleTreeList = thisMerkleTree.getBlockMerkleTreeList();
List<ContainerProtos.BlockMerkleTree> peerBlockMerkleTreeList = peerMerkleTree.getBlockMerkleTreeList();
int thisIdx = 0, peerIdx = 0;

// Step 1: Process both lists while elements are present in both
while (thisIdx < thisBlockMerkleTreeList.size() && peerIdx < peerBlockMerkleTreeList.size()) {
ContainerProtos.BlockMerkleTree thisBlockMerkleTree = thisBlockMerkleTreeList.get(thisIdx);
ContainerProtos.BlockMerkleTree peerBlockMerkleTree = peerBlockMerkleTreeList.get(peerIdx);

if (thisBlockMerkleTree.getBlockID() == peerBlockMerkleTree.getBlockID()) {
// Matching block ID; check checksum
if (!commonDeletedBlockSet.contains(thisBlockMerkleTree.getBlockID())
&& thisBlockMerkleTree.getBlockChecksum() != peerBlockMerkleTree.getBlockChecksum()) {
compareBlockMerkleTree(thisBlockMerkleTree, peerBlockMerkleTree, report);
}
thisIdx++;
peerIdx++;
} else if (thisBlockMerkleTree.getBlockID() < peerBlockMerkleTree.getBlockID()) {
// This block's ID is smaller; advance thisIdx to catch up
thisIdx++;
} else {
// Peer block's ID is smaller; record missing block and advance peerIdx
report.addMissingBlock(peerBlockMerkleTree);
peerIdx++;
}
}

// Step 2: Process remaining blocks in the peer list
while (peerIdx < peerBlockMerkleTreeList.size()) {
report.addMissingBlock(peerBlockMerkleTreeList.get(peerIdx));
peerIdx++;
}
}

private void compareBlockMerkleTree(ContainerProtos.BlockMerkleTree thisBlockMerkleTree,
ContainerProtos.BlockMerkleTree peerBlockMerkleTree,
ContainerDiff report) {

List<ContainerProtos.ChunkMerkleTree> thisChunkMerkleTreeList = thisBlockMerkleTree.getChunkMerkleTreeList();
List<ContainerProtos.ChunkMerkleTree> peerChunkMerkleTreeList = peerBlockMerkleTree.getChunkMerkleTreeList();
int thisIdx = 0, peerIdx = 0;

// Step 1: Process both lists while elements are present in both
while (thisIdx < thisChunkMerkleTreeList.size() && peerIdx < peerChunkMerkleTreeList.size()) {
ContainerProtos.ChunkMerkleTree thisChunkMerkleTree = thisChunkMerkleTreeList.get(thisIdx);
ContainerProtos.ChunkMerkleTree peerChunkMerkleTree = peerChunkMerkleTreeList.get(peerIdx);

if (thisChunkMerkleTree.getOffset() == peerChunkMerkleTree.getOffset()) {
// Possible state when this Checksum != peer Checksum:
// thisTree = Healthy, peerTree = Healthy -> We don't know what is healthy. Skip.
// thisTree = Unhealthy, peerTree = Healthy -> Add to corrupt chunk.
// thisTree = Healthy, peerTree = unhealthy -> Do nothing as thisTree is healthy.
// thisTree = Unhealthy, peerTree = Unhealthy -> Do Nothing as both are corrupt.
if (thisChunkMerkleTree.getChunkChecksum() != peerChunkMerkleTree.getChunkChecksum() &&
!thisChunkMerkleTree.getIsHealthy() && peerChunkMerkleTree.getIsHealthy()) {
report.addCorruptChunk(peerBlockMerkleTree.getBlockID(), peerChunkMerkleTree);
}
thisIdx++;
peerIdx++;
} else if (thisChunkMerkleTree.getOffset() < peerChunkMerkleTree.getOffset()) {
// This chunk's offset is smaller; advance thisIdx
thisIdx++;
} else {
// Peer chunk's offset is smaller; record missing chunk and advance peerIdx
report.addMissingChunk(peerBlockMerkleTree.getBlockID(), peerChunkMerkleTree);
peerIdx++;
}
}

// Step 2: Process remaining chunks in the peer list
while (peerIdx < peerChunkMerkleTreeList.size()) {
report.addMissingChunk(peerBlockMerkleTree.getBlockID(), peerChunkMerkleTreeList.get(peerIdx));
peerIdx++;
}
}

/**
Expand Down Expand Up @@ -245,11 +372,48 @@ public static boolean checksumFileExist(Container container) {
* This class represents the difference between our replica of a container and a peer's replica of a container.
* It summarizes the operations we need to do to reconcile our replica with the peer replica it was compared to.
*
* TODO HDDS-10928
*/
public static class ContainerDiff {
private final List<ContainerProtos.BlockMerkleTree> missingBlocks;
private final Map<Long, List<ContainerProtos.ChunkMerkleTree>> missingChunks;
private final Map<Long, List<ContainerProtos.ChunkMerkleTree>> corruptChunks;

public ContainerDiff() {
this.missingBlocks = new ArrayList<>();
this.missingChunks = new HashMap<>();
this.corruptChunks = new HashMap<>();
}

public void addMissingBlock(ContainerProtos.BlockMerkleTree missingBlockMerkleTree) {
this.missingBlocks.add(missingBlockMerkleTree);
}

public void addMissingChunk(long blockId, ContainerProtos.ChunkMerkleTree missingChunkMerkleTree) {
this.missingChunks.computeIfAbsent(blockId, any -> new ArrayList<>()).add(missingChunkMerkleTree);
}

public void addCorruptChunk(long blockId, ContainerProtos.ChunkMerkleTree corruptChunk) {
this.corruptChunks.computeIfAbsent(blockId, any -> new ArrayList<>()).add(corruptChunk);
}

public List<ContainerProtos.BlockMerkleTree> getMissingBlocks() {
return missingBlocks;
}

public Map<Long, List<ContainerProtos.ChunkMerkleTree>> getMissingChunks() {
return missingChunks;
}

public Map<Long, List<ContainerProtos.ChunkMerkleTree>> getCorruptChunks() {
return corruptChunks;
}

/**
* If isHealthy is true, It means current replica is healthy. The peer replica still may have corruption,
* which it will fix when it reconciles with other peers.
*/
public boolean isHealthy() {
return missingBlocks.isEmpty() && missingChunks.isEmpty() && corruptChunks.isEmpty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.hadoop.ozone.container.checksum;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.ozone.common.ChecksumByteBuffer;
import org.apache.hadoop.ozone.common.ChecksumByteBufferFactory;
Expand Down Expand Up @@ -62,6 +63,25 @@ public void addChunks(long blockID, Collection<ContainerProtos.ChunkInfo> chunks
id2Block.computeIfAbsent(blockID, BlockMerkleTree::new).addChunks(chunks);
}

public void addChunk(long blockID, ContainerProtos.ChunkInfo chunk) {
id2Block.computeIfAbsent(blockID, BlockMerkleTree::new).addChunk(chunk);
}

@VisibleForTesting
public BlockMerkleTree getChunks(long blockID) {
return id2Block.get(blockID);
}

@VisibleForTesting
public BlockMerkleTree get(long blockID) {
return id2Block.get(blockID);
}

@VisibleForTesting
public BlockMerkleTree remove(long blockID) {
return id2Block.remove(blockID);
}

/**
* Uses chunk hashes to compute all remaining hashes in the tree, and returns it as a protobuf object. No checksum
* computation for the tree happens outside of this method.
Expand Down Expand Up @@ -91,7 +111,7 @@ public ContainerProtos.ContainerMerkleTree toProto() {
/**
* Represents a merkle tree for a single block within a container.
*/
private static class BlockMerkleTree {
public static class BlockMerkleTree {
// Map of each offset within the block to its chunk info.
// Chunk order in the checksum is determined by their offset.
private final SortedMap<Long, ChunkMerkleTree> offset2Chunk;
Expand All @@ -114,6 +134,30 @@ public void addChunks(Collection<ContainerProtos.ChunkInfo> chunks) {
}
}

public void addChunk(ContainerProtos.ChunkInfo chunk) {
offset2Chunk.put(chunk.getOffset(), new ChunkMerkleTree(chunk));
}

public void setHealthy(long offset, boolean healthy) {
ChunkMerkleTree chunkMerkleTree = offset2Chunk.get(offset);
if (chunkMerkleTree == null) {
return;
}
chunkMerkleTree.setHealthy(healthy);
}

public ChunkMerkleTree removeChunk(long offset) {
return offset2Chunk.remove(offset);
}

public ChunkMerkleTree getChunk(long offset) {
return offset2Chunk.get(offset);
}

public long getBlockId() {
return blockID;
}

/**
* Uses chunk hashes to compute a block hash for this tree, and returns it as a protobuf object. All block checksum
* computation for the tree happens within this method.
Expand Down Expand Up @@ -150,13 +194,34 @@ public ContainerProtos.BlockMerkleTree toProto() {
* Each chunk has multiple checksums within it at each "bytesPerChecksum" interval.
* This class computes one checksum for the whole chunk by aggregating these.
*/
private static class ChunkMerkleTree {
private final ContainerProtos.ChunkInfo chunk;
public static class ChunkMerkleTree {
private ContainerProtos.ChunkInfo chunk;
private boolean isHealthy = true;

ChunkMerkleTree(ContainerProtos.ChunkInfo chunk) {
this.chunk = chunk;
}

ChunkMerkleTree(ContainerProtos.ChunkInfo chunk, boolean isHealthy) {
this.chunk = chunk;
this.isHealthy = isHealthy;
}

@VisibleForTesting
public void setChunk(ContainerProtos.ChunkInfo chunk) {
this.chunk = chunk;
}

@VisibleForTesting
public void setHealthy(boolean healthy) {
this.isHealthy = healthy;
}


public ContainerProtos.ChunkInfo getChunkInfo() {
return chunk;
}

/**
* Computes a single hash for this ChunkInfo object. All chunk level checksum computation happens within this
* method.
Expand All @@ -172,6 +237,7 @@ public ContainerProtos.ChunkMerkleTree toProto() {
return ContainerProtos.ChunkMerkleTree.newBuilder()
.setOffset(chunk.getOffset())
.setLength(chunk.getLen())
.setIsHealthy(isHealthy)
.setChunkChecksum(checksumImpl.getValue())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ public static void unregister() {
@Metric(about = "Number of Merkle tree read failure")
private MutableCounterLong numMerkleTreeReadFailure;

@Metric(about = "Number of Merkle tree diff failure")
private MutableCounterLong numMerkleTreeDiffFailure;

@Metric(about = "Number of Merkle tree diff success")
private MutableCounterLong numMerkleTreeDiffSuccess;

@Metric(about = "Number of healthy container diff")
private MutableCounterLong numHealthyContainerDiff;

@Metric(about = "Number of unhealthy container diff")
private MutableCounterLong numUnhealthyContainerDiff;

@Metric(about = "Merkle tree write latency")
private MutableRate merkleTreeWriteLatencyNS;

Expand All @@ -60,6 +72,9 @@ public static void unregister() {
@Metric(about = "Merkle tree creation latency")
private MutableRate merkleTreeCreateLatencyNS;

@Metric(about = "Merkle tree diff latency")
private MutableRate merkleTreeDiffLatencyNS;

public void incrementMerkleTreeWriteFailures() {
this.numMerkleTreeWriteFailure.incr();
}
Expand All @@ -68,6 +83,21 @@ public void incrementMerkleTreeReadFailures() {
this.numMerkleTreeReadFailure.incr();
}

public void incrementMerkleTreeDiffFailures() {
this.numMerkleTreeDiffFailure.incr();
}

public void incrementMerkleTreeDiffSuccesses() {
this.numMerkleTreeDiffSuccess.incr();
}

public void incrementHealthyContainerDiffs() {
this.numHealthyContainerDiff.incr();
}
public void incrementUnhealthyContainerDiffs() {
this.numUnhealthyContainerDiff.incr();
}

public MutableRate getWriteContainerMerkleTreeLatencyNS() {
return this.merkleTreeWriteLatencyNS;
}
Expand All @@ -79,4 +109,8 @@ public MutableRate getReadContainerMerkleTreeLatencyNS() {
public MutableRate getCreateMerkleTreeLatencyNS() {
return this.merkleTreeCreateLatencyNS;
}

public MutableRate getMerkleTreeDiffLatencyNS() {
return this.merkleTreeDiffLatencyNS;
}
}
Loading