Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
Expand Down Expand Up @@ -139,16 +142,29 @@ public void markBlocksAsDeleted(KeyValueContainerData data, Collection<Long> del

// Although the persisted block list should already be sorted, we will sort it here to make sure.
// This will automatically fix any bugs in the persisted order that may show up.
SortedSet<Long> sortedDeletedBlockIDs = new TreeSet<>(checksumInfoBuilder.getDeletedBlocksList());
sortedDeletedBlockIDs.addAll(deletedBlockIDs);
// TODO HDDS-13245 this conversion logic will be replaced and block checksums will be populated.
// Create BlockMerkleTree to wrap each input block ID.
List<ContainerProtos.BlockMerkleTree> deletedBlocks = deletedBlockIDs.stream()
.map(blockID ->
ContainerProtos.BlockMerkleTree.newBuilder().setBlockID(blockID).build())
.collect(Collectors.toList());
// Add the original blocks to the list.
deletedBlocks.addAll(checksumInfoBuilder.getDeletedBlocksList());
// Sort and deduplicate the list.
Map<Long, ContainerProtos.BlockMerkleTree> sortedDeletedBlocks = deletedBlocks.stream()
.collect(Collectors.toMap(ContainerProtos.BlockMerkleTree::getBlockID,
Function.identity(),
(a, b) -> a,
TreeMap::new));

checksumInfoBuilder
.setContainerID(containerID)
.clearDeletedBlocks()
.addAllDeletedBlocks(sortedDeletedBlockIDs);
.addAllDeletedBlocks(sortedDeletedBlocks.values());

write(data, checksumInfoBuilder.build());
LOG.debug("Deleted block list for container {} updated with {} new blocks", data.getContainerID(),
sortedDeletedBlockIDs.size());
sortedDeletedBlocks.size());
} finally {
writeLock.unlock();
}
Expand Down Expand Up @@ -196,8 +212,8 @@ private void compareContainerMerkleTree(ContainerProtos.ContainerChecksumInfo th
ContainerDiffReport report) {
ContainerProtos.ContainerMerkleTree thisMerkleTree = thisChecksumInfo.getContainerMerkleTree();
ContainerProtos.ContainerMerkleTree peerMerkleTree = peerChecksumInfo.getContainerMerkleTree();
Set<Long> thisDeletedBlockSet = new HashSet<>(thisChecksumInfo.getDeletedBlocksList());
Set<Long> peerDeletedBlockSet = new HashSet<>(peerChecksumInfo.getDeletedBlocksList());
Set<Long> thisDeletedBlockSet = getDeletedBlockIDs(thisChecksumInfo);
Set<Long> peerDeletedBlockSet = getDeletedBlockIDs(peerChecksumInfo);

if (thisMerkleTree.getDataChecksum() == peerMerkleTree.getDataChecksum()) {
return;
Expand Down Expand Up @@ -274,7 +290,7 @@ private void compareBlockMerkleTree(ContainerProtos.BlockMerkleTree thisBlockMer
// thisTree = Healthy, peerTree = unhealthy -> Do nothing as thisTree is healthy.
// thisTree = Unhealthy, peerTree = Unhealthy -> Do Nothing as both are corrupt.
if (thisChunkMerkleTree.getDataChecksum() != peerChunkMerkleTree.getDataChecksum() &&
!thisChunkMerkleTree.getIsHealthy() && peerChunkMerkleTree.getIsHealthy()) {
!thisChunkMerkleTree.getChecksumMatches() && peerChunkMerkleTree.getChecksumMatches()) {
report.addCorruptChunk(peerBlockMerkleTree.getBlockID(), peerChunkMerkleTree);
}
thisIdx++;
Expand Down Expand Up @@ -371,6 +387,13 @@ private void write(ContainerData data, ContainerProtos.ContainerChecksumInfo che
}
}

// TODO HDDS-13245 This method will no longer be required.
private SortedSet<Long> getDeletedBlockIDs(ContainerProtos.ContainerChecksumInfoOrBuilder checksumInfo) {
return checksumInfo.getDeletedBlocksList().stream()
.map(ContainerProtos.BlockMerkleTree::getBlockID)
.collect(Collectors.toCollection(TreeSet::new));
}

/**
* Reads the container checksum info file from the disk as bytes.
* Callers are not required to hold a lock while calling this since writes are done to a tmp file and atomically
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ private static class ChunkMerkleTreeWriter {
ChunkMerkleTreeWriter(ContainerProtos.ChunkMerkleTree chunkTree) {
length = chunkTree.getLength();
offset = chunkTree.getOffset();
isHealthy = chunkTree.getIsHealthy();
isHealthy = chunkTree.getChecksumMatches();
dataChecksum = chunkTree.getDataChecksum();
}

Expand All @@ -237,7 +237,7 @@ public ContainerProtos.ChunkMerkleTree toProto() {
return ContainerProtos.ChunkMerkleTree.newBuilder()
.setOffset(offset)
.setLength(length)
.setIsHealthy(isHealthy)
.setChecksumMatches(isHealthy)
.setDataChecksum(dataChecksum)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1790,7 +1790,7 @@ private long reconcileChunksPerBlock(KeyValueContainer container, Pipeline pipel
break;
}

if (!chunkMerkleTree.getIsHealthy()) {
if (!chunkMerkleTree.getChecksumMatches()) {
LOG.warn("Skipping chunk at offset {} in block {} of container {} from peer {} since peer reported it as " +
"unhealthy.", chunkOffset, localID, containerID, peer);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static void assertTreesSortedAndMatch(ContainerProtos.ContainerMerkleTree
assertEquals(expectedChunkTree.getOffset(), actualChunkTree.getOffset());
assertEquals(expectedChunkTree.getLength(), actualChunkTree.getLength());
assertEquals(expectedChunkTree.getDataChecksum(), actualChunkTree.getDataChecksum());
assertEquals(expectedChunkTree.getIsHealthy(), actualChunkTree.getIsHealthy());
assertEquals(expectedChunkTree.getChecksumMatches(), actualChunkTree.getChecksumMatches());
}
}
}
Expand Down Expand Up @@ -247,7 +247,7 @@ private static void introduceCorruptChunks(ContainerProtos.ContainerMerkleTree.B
ContainerProtos.ChunkMerkleTree.Builder chunkBuilder = blockBuilder.getChunkMerkleTreeBuilder(randomChunkIndex);
diff.addCorruptChunk(blockBuilder.getBlockID(), chunkBuilder.build());
chunkBuilder.setDataChecksum(chunkBuilder.getDataChecksum() + random.nextInt(1000) + 1);
chunkBuilder.setIsHealthy(false);
chunkBuilder.setChecksumMatches(false);
blockBuilder.setDataChecksum(random.nextLong());
treeBuilder.setDataChecksum(random.nextLong());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
Expand Down Expand Up @@ -162,7 +163,7 @@ public void testWriteOnlyDeletedBlocksToFile() throws Exception {
ContainerProtos.ContainerChecksumInfo checksumInfo = readChecksumFile(container);

assertEquals(CONTAINER_ID, checksumInfo.getContainerID());
assertEquals(expectedBlocksToDelete, checksumInfo.getDeletedBlocksList());
assertEquals(expectedBlocksToDelete, getDeletedBlockIDs(checksumInfo));
ContainerProtos.ContainerMerkleTree treeProto = checksumInfo.getContainerMerkleTree();
assertEquals(0, treeProto.getDataChecksum());
assertTrue(treeProto.getBlockMerkleTreeList().isEmpty());
Expand All @@ -175,14 +176,14 @@ public void testWriteDuplicateDeletedBlocks() throws Exception {
// Pass a duplicate block, it should be filtered out.
checksumManager.markBlocksAsDeleted(container, Arrays.asList(1L, 2L, 2L, 3L));
ContainerProtos.ContainerChecksumInfo checksumInfo = readChecksumFile(container);
assertEquals(expectedBlocksToDelete, checksumInfo.getDeletedBlocksList());
assertEquals(expectedBlocksToDelete, getDeletedBlockIDs(checksumInfo));

// Blocks are expected to appear in the file deduplicated in this order.
expectedBlocksToDelete = Arrays.asList(1L, 2L, 3L, 4L);
// Pass another set of blocks. This and the previous list passed should be joined, deduplicated, and sorted.
checksumManager.markBlocksAsDeleted(container, Arrays.asList(2L, 2L, 3L, 4L));
checksumInfo = readChecksumFile(container);
assertEquals(expectedBlocksToDelete, checksumInfo.getDeletedBlocksList());
assertEquals(expectedBlocksToDelete, getDeletedBlockIDs(checksumInfo));
}

@Test
Expand All @@ -191,7 +192,7 @@ public void testWriteBlocksOutOfOrder() throws Exception {
List<Long> expectedBlocksToDelete = Arrays.asList(1L, 2L, 3L);
checksumManager.markBlocksAsDeleted(container, Arrays.asList(3L, 1L, 2L));
ContainerProtos.ContainerChecksumInfo checksumInfo = readChecksumFile(container);
assertEquals(expectedBlocksToDelete, checksumInfo.getDeletedBlocksList());
assertEquals(expectedBlocksToDelete, getDeletedBlockIDs(checksumInfo));
}

@Test
Expand All @@ -210,7 +211,7 @@ public void testDeletedBlocksPreservedOnTreeWrite() throws Exception {

assertTrue(metrics.getCreateMerkleTreeLatencyNS().lastStat().total() > 0);
assertEquals(CONTAINER_ID, checksumInfo.getContainerID());
assertEquals(expectedBlocksToDelete, checksumInfo.getDeletedBlocksList());
assertEquals(expectedBlocksToDelete, getDeletedBlockIDs(checksumInfo));
assertTreesSortedAndMatch(tree.toProto(), checksumInfo.getContainerMerkleTree());
}

Expand All @@ -230,7 +231,7 @@ public void testTreePreservedOnDeletedBlocksWrite() throws Exception {

assertTrue(metrics.getCreateMerkleTreeLatencyNS().lastStat().total() > 0);
assertEquals(CONTAINER_ID, checksumInfo.getContainerID());
assertEquals(expectedBlocksToDelete, checksumInfo.getDeletedBlocksList());
assertEquals(expectedBlocksToDelete, getDeletedBlockIDs(checksumInfo));
assertTreesSortedAndMatch(tree.toProto(), checksumInfo.getContainerMerkleTree());
}

Expand Down Expand Up @@ -409,7 +410,13 @@ void testDeletedBlocksInPeerAndBoth() throws Exception {
ContainerMerkleTreeWriter peerMerkleTree = buildTestTree(config);
// Introduce missing blocks in our merkle tree
ContainerProtos.ContainerMerkleTree ourMerkleTree = buildTestTreeWithMismatches(peerMerkleTree, 3, 0, 0).getLeft();
List<Long> deletedBlockList = Arrays.asList(1L, 2L, 3L, 4L, 5L);

List<ContainerProtos.BlockMerkleTree> deletedBlockList = new ArrayList<>();
List<Long> blockIDs = Arrays.asList(1L, 2L, 3L, 4L, 5L);
for (Long blockID : blockIDs) {
deletedBlockList.add(ContainerProtos.BlockMerkleTree.newBuilder().setBlockID(blockID).build());
}

// Mark all the blocks as deleted in peer merkle tree
ContainerProtos.ContainerChecksumInfo peerChecksumInfo = ContainerProtos.ContainerChecksumInfo
.newBuilder().setContainerMerkleTree(peerMerkleTree.toProto()).setContainerID(CONTAINER_ID)
Expand All @@ -426,7 +433,7 @@ void testDeletedBlocksInPeerAndBoth() throws Exception {
assertTrue(containerDiff.getMissingChunks().isEmpty());

// Delete blocks in our merkle tree as well.
checksumManager.markBlocksAsDeleted(container, deletedBlockList);
checksumManager.markBlocksAsDeleted(container, blockIDs);
checksumInfo = checksumManager.read(container);
containerDiff = checksumManager.diff(checksumInfo.get(), peerChecksumInfo);

Expand Down Expand Up @@ -474,7 +481,13 @@ void testCorruptionInOurMerkleTreeAndDeletedBlocksInPeer() throws Exception {
ContainerMerkleTreeWriter peerMerkleTree = buildTestTree(config);
// Introduce block corruption in our merkle tree.
ContainerProtos.ContainerMerkleTree ourMerkleTree = buildTestTreeWithMismatches(peerMerkleTree, 0, 3, 3).getLeft();
List<Long> deletedBlockList = Arrays.asList(1L, 2L, 3L, 4L, 5L);

List<ContainerProtos.BlockMerkleTree> deletedBlockList = new ArrayList<>();
List<Long> blockIDs = Arrays.asList(1L, 2L, 3L, 4L, 5L);
for (Long blockID : blockIDs) {
deletedBlockList.add(ContainerProtos.BlockMerkleTree.newBuilder().setBlockID(blockID).build());
}

ContainerProtos.ContainerChecksumInfo peerChecksumInfo = ContainerProtos.ContainerChecksumInfo
.newBuilder().setContainerMerkleTree(peerMerkleTree.toProto()).setContainerID(CONTAINER_ID)
.addAllDeletedBlocks(deletedBlockList).build();
Expand All @@ -499,10 +512,17 @@ void testContainerDiffWithBlockDeletionInPeer() throws Exception {
ContainerMerkleTreeWriter dummy = buildTestTree(config, 5);
// Introduce block corruption in our merkle tree.
ContainerProtos.ContainerMerkleTree ourMerkleTree = buildTestTreeWithMismatches(dummy, 3, 3, 3).getLeft();
List<Long> deletedBlockList = Arrays.asList(6L, 7L, 8L, 9L, 10L);

List<ContainerProtos.BlockMerkleTree> deletedBlockList = new ArrayList<>();
List<Long> blockIDs = Arrays.asList(6L, 7L, 8L, 9L, 10L);
for (Long blockID : blockIDs) {
deletedBlockList.add(ContainerProtos.BlockMerkleTree.newBuilder().setBlockID(blockID).build());
}

ContainerProtos.ContainerChecksumInfo.Builder peerChecksumInfoBuilder = ContainerProtos.ContainerChecksumInfo
.newBuilder().setContainerMerkleTree(peerMerkleTree.toProto()).setContainerID(CONTAINER_ID)
.addAllDeletedBlocks(deletedBlockList);

writeContainerDataTreeProto(container, ourMerkleTree);

ContainerProtos.ContainerChecksumInfo peerChecksumInfo = peerChecksumInfoBuilder.build();
Expand All @@ -513,7 +533,7 @@ void testContainerDiffWithBlockDeletionInPeer() throws Exception {
assertFalse(containerDiff.getMissingBlocks().isEmpty());
// Missing block does not contain the deleted blocks 6L to 10L
assertFalse(containerDiff.getMissingBlocks().stream().anyMatch(any ->
deletedBlockList.contains(any.getBlockID())));
blockIDs.contains(any.getBlockID())));
assertFalse(containerDiff.getMissingBlocks().isEmpty());
assertFalse(containerDiff.getMissingChunks().isEmpty());

Expand All @@ -525,12 +545,18 @@ void testContainerDiffWithBlockDeletionInPeer() throws Exception {
assertFalse(containerDiff.getMissingBlocks().isEmpty());
// Missing block does not contain the deleted blocks 6L to 10L
assertTrue(containerDiff.getMissingBlocks().stream().anyMatch(any ->
deletedBlockList.contains(any.getBlockID())));
blockIDs.contains(any.getBlockID())));
}

@Test
public void testChecksumTreeFilePath() {
assertEquals(checksumFile.getAbsolutePath(),
ContainerChecksumTreeManager.getContainerChecksumFile(container).getAbsolutePath());
}

private List<Long> getDeletedBlockIDs(ContainerProtos.ContainerChecksumInfo checksumInfo) {
return checksumInfo.getDeletedBlocksList().stream()
.map(ContainerProtos.BlockMerkleTree::getBlockID)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ private ContainerProtos.ChunkMerkleTree buildExpectedChunkTree(ContainerProtos.C
.setOffset(chunk.getOffset())
.setLength(chunk.getLen())
.setDataChecksum(computeExpectedChunkChecksum(chunk.getChecksumData().getChecksumsList()))
.setIsHealthy(isHealthy)
.setChecksumMatches(isHealthy)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -1196,11 +1197,11 @@ private void assertDeletionsInChecksumFile(ContainerData data, int numBlocks) {
}
assertNotNull(checksumInfo);

List<Long> deletedBlocks = checksumInfo.getDeletedBlocksList();
List<ContainerProtos.BlockMerkleTree> deletedBlocks = checksumInfo.getDeletedBlocksList();
assertEquals(numBlocks, deletedBlocks.size());
// Create a sorted copy of the list to check the order written to the file.
List<Long> sortedDeletedBlocks = checksumInfo.getDeletedBlocksList().stream()
.sorted()
List<ContainerProtos.BlockMerkleTree> sortedDeletedBlocks = checksumInfo.getDeletedBlocksList().stream()
.sorted(Comparator.comparingLong(ContainerProtos.BlockMerkleTree::getBlockID))
.collect(Collectors.toList());
assertNotSame(sortedDeletedBlocks, deletedBlocks);
assertEquals(sortedDeletedBlocks, deletedBlocks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ message ChunkMerkleTree {
optional int64 offset = 1;
optional int64 length = 2;
optional int64 dataChecksum = 3;
optional bool isHealthy = 4;
optional bool checksumMatches = 4;
}

message BlockMerkleTree {
Expand All @@ -576,7 +576,7 @@ message ContainerMerkleTree {
message ContainerChecksumInfo {
optional int64 containerID = 1;
optional ContainerMerkleTree containerMerkleTree = 2;
repeated int64 deletedBlocks = 3;
repeated BlockMerkleTree deletedBlocks = 3;
}

service XceiverClientProtocolService {
Expand Down
Loading