diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
new file mode 100644
index 000000000000..939c6d08b336
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.checksum;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.Lock;
+
+import com.google.common.util.concurrent.Striped;
+import org.apache.hadoop.hdds.utils.SimpleStriped;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class coordinates reading and writing Container checksum information for all containers.
+ */
+public class ContainerChecksumTreeManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ContainerChecksumTreeManager.class);
+
+ // Used to coordinate reads and writes to each container's checksum file.
+ // Each container ID is mapped to a stripe.
+ private final Striped fileLock;
+
+ /**
+ * Creates one instance that should be used to coordinate all container checksum info within a datanode.
+ */
+ public ContainerChecksumTreeManager(DatanodeConfiguration dnConf) {
+ fileLock = SimpleStriped.readWriteLock(dnConf.getContainerChecksumLockStripes(), true);
+ }
+
+ /**
+ * Writes the specified container merkle tree to the specified container's checksum file.
+ * The data merkle tree within the file is replaced with the {@code tree} parameter, but all other content of the
+ * file remains unchanged.
+ * Concurrent writes to the same file are coordinated internally.
+ */
+ public void writeContainerDataTree(KeyValueContainerData data, ContainerMerkleTree tree) throws IOException {
+ Lock writeLock = getWriteLock(data.getContainerID());
+ writeLock.lock();
+ try {
+ ContainerProtos.ContainerChecksumInfo newChecksumInfo = read(data).toBuilder()
+ .setContainerMerkleTree(tree.toProto())
+ .build();
+ write(data, newChecksumInfo);
+ LOG.debug("Data merkle tree for container {} updated", data.getContainerID());
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Adds the specified blocks to the list of deleted blocks specified in the container's checksum file.
+ * All other content of the file remains unchanged.
+ * Concurrent writes to the same file are coordinated internally.
+ */
+ public void markBlocksAsDeleted(KeyValueContainerData data, SortedSet deletedBlockIDs) throws IOException {
+ Lock writeLock = getWriteLock(data.getContainerID());
+ writeLock.lock();
+ try {
+ ContainerProtos.ContainerChecksumInfo.Builder checksumInfoBuilder = read(data).toBuilder();
+ // Although the persisted block list should already be sorted, we will sort it here to make sure.
+ // This will automatically fix any bugs in the persisted order that may show up.
+ SortedSet sortedDeletedBlockIDs = new TreeSet<>(checksumInfoBuilder.getDeletedBlocksList());
+ // Since the provided list of block IDs is already sorted, this is a linear time addition.
+ sortedDeletedBlockIDs.addAll(deletedBlockIDs);
+
+ checksumInfoBuilder
+ .clearDeletedBlocks()
+ .addAllDeletedBlocks(sortedDeletedBlockIDs)
+ .build();
+ write(data, checksumInfoBuilder.build());
+ LOG.debug("Deleted block list for container {} updated", data.getContainerID());
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public ContainerDiff diff(KeyValueContainerData thisContainer, File otherContainerTree)
+ throws IOException {
+ // TODO HDDS-10928 compare the checksum info of the two containers and return a summary.
+ // Callers can act on this summary to repair their container replica using the peer's replica.
+ // This method will use the read lock, which is unused in the current implementation.
+ return new ContainerDiff();
+ }
+
+ /**
+ * Returns the container checksum tree file for the specified container without deserializing it.
+ */
+ public File getContainerChecksumFile(KeyValueContainerData data) {
+ return new File(data.getMetadataPath(), data.getContainerID() + ".tree");
+ }
+
+ private Lock getReadLock(long containerID) {
+ return fileLock.get(containerID).readLock();
+ }
+
+ private Lock getWriteLock(long containerID) {
+ return fileLock.get(containerID).writeLock();
+ }
+
+ private ContainerProtos.ContainerChecksumInfo read(KeyValueContainerData data) throws IOException {
+ long containerID = data.getContainerID();
+ Lock readLock = getReadLock(containerID);
+ readLock.lock();
+ try {
+ File checksumFile = getContainerChecksumFile(data);
+ // If the checksum file has not been created yet, return an empty instance.
+ // Since all writes happen as part of an atomic read-modify-write cycle that requires a write lock, two empty
+ // instances for the same container obtained only under the read lock will not conflict.
+ if (!checksumFile.exists()) {
+ LOG.debug("No checksum file currently exists for container {} at the path {}. Returning an empty instance.",
+ containerID, checksumFile);
+ return ContainerProtos.ContainerChecksumInfo.newBuilder()
+ .setContainerID(containerID)
+ .build();
+ }
+ try (FileInputStream inStream = new FileInputStream(checksumFile)) {
+ return ContainerProtos.ContainerChecksumInfo.parseFrom(inStream);
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private void write(KeyValueContainerData data, ContainerProtos.ContainerChecksumInfo checksumInfo)
+ throws IOException {
+ Lock writeLock = getWriteLock(data.getContainerID());
+ writeLock.lock();
+ try (FileOutputStream outStream = new FileOutputStream(getContainerChecksumFile(data))) {
+ checksumInfo.writeTo(outStream);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * This class represents the difference between our replica of a container and a peer's replica of a container.
+ * It summarizes the operations we need to do to reconcile our replica with the peer replica it was compared to.
+ *
+ * TODO HDDS-10928
+ */
+ public static class ContainerDiff {
+ public ContainerDiff() {
+
+ }
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTree.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTree.java
new file mode 100644
index 000000000000..9eeb50b6498c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTree.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.checksum;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.common.ChecksumByteBuffer;
+import org.apache.hadoop.ozone.common.ChecksumByteBufferFactory;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * This class represents a Merkle tree that provides one checksum for all data within a container.
+ *
+ * As the leaves of the tree, a checksum for each chunk is computed by taking a checksum of all checksums within that
+ * chunk. Each chunk checksum in a block is further checksummed together to generate the block level checksum. Finally,
+ * The checksums of all blocks are checksummed together to create a container level checksum.
+ * Note that checksums are order dependent. Chunk checksums are sorted by their
+ * offset within a block, and block checksums are sorted by their block ID.
+ *
+ * This class can be used to construct a consistent and completely filled {@link ContainerProtos.ContainerMerkleTree}
+ * object. It allows building a container merkle tree from scratch by incrementally adding chunks.
+ * The final checksums at higher levels of the tree are not calculated until
+ * {@link ContainerMerkleTree#toProto} is called.
+ */
+public class ContainerMerkleTree {
+
+ private final SortedMap id2Block;
+
+ /**
+ * Constructs an empty Container merkle tree object.
+ */
+ public ContainerMerkleTree() {
+ id2Block = new TreeMap<>();
+ }
+
+ /**
+ * Adds chunks to a block in the tree. The block entry will be created if it is the first time adding chunks to it.
+ * If the block entry already exists, the chunks will be added to the existing chunks for that block.
+ *
+ * @param blockID The ID of the block that these chunks belong to.
+ * @param chunks A list of chunks to add to this block. The chunks will be sorted internally by their offset.
+ */
+ public void addChunks(long blockID, Collection chunks) {
+ id2Block.computeIfAbsent(blockID, BlockMerkleTree::new).addChunks(chunks);
+ }
+
+ /**
+ * Uses chunk hashes to compute all remaining hashes in the tree, and returns it as a protobuf object. No checksum
+ * computation for the tree happens outside of this method.
+ *
+ * @return A complete protobuf object representation of this tree.
+ */
+ public ContainerProtos.ContainerMerkleTree toProto() {
+ // Compute checksums and return the result.
+ ContainerProtos.ContainerMerkleTree.Builder containerTreeBuilder = ContainerProtos.ContainerMerkleTree.newBuilder();
+ ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
+ ByteBuffer containerChecksumBuffer = ByteBuffer.allocate(Long.BYTES * id2Block.size());
+
+ for (BlockMerkleTree blockTree: id2Block.values()) {
+ ContainerProtos.BlockMerkleTree blockTreeProto = blockTree.toProto();
+ containerTreeBuilder.addBlockMerkleTree(blockTreeProto);
+ // Add the block's checksum to the buffer that will be used to calculate the container checksum.
+ containerChecksumBuffer.putLong(blockTreeProto.getBlockChecksum());
+ }
+ containerChecksumBuffer.flip();
+ checksumImpl.update(containerChecksumBuffer);
+
+ return containerTreeBuilder
+ .setDataChecksum(checksumImpl.getValue())
+ .build();
+ }
+
+ /**
+ * Represents a merkle tree for a single block within a container.
+ */
+ private static class BlockMerkleTree {
+ // Map of each offset within the block to its chunk info.
+ // Chunk order in the checksum is determined by their offset.
+ private final SortedMap offset2Chunk;
+ private final long blockID;
+
+ BlockMerkleTree(long blockID) {
+ this.blockID = blockID;
+ this.offset2Chunk = new TreeMap<>();
+ }
+
+ /**
+ * Adds the specified chunks to this block. The offset value of the chunk must be unique within the block,
+ * otherwise it will overwrite the previous value at that offset.
+ *
+ * @param chunks A list of chunks to add to this block.
+ */
+ public void addChunks(Collection chunks) {
+ for (ChunkInfo chunk: chunks) {
+ offset2Chunk.put(chunk.getOffset(), new ChunkMerkleTree(chunk));
+ }
+ }
+
+ /**
+ * Uses chunk hashes to compute a block hash for this tree, and returns it as a protobuf object. All block checksum
+ * computation for the tree happens within this method.
+ *
+ * @return A complete protobuf object representation of this block tree.
+ */
+ public ContainerProtos.BlockMerkleTree toProto() {
+ ContainerProtos.BlockMerkleTree.Builder blockTreeBuilder = ContainerProtos.BlockMerkleTree.newBuilder();
+ ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
+ ByteBuffer blockChecksumBuffer = ByteBuffer.allocate(Long.BYTES * offset2Chunk.size());
+
+ for (ChunkMerkleTree chunkTree: offset2Chunk.values()) {
+ // Ordering of checksums within a chunk is assumed to be in the order they are written.
+ // This assumption is already built in to the code that reads and writes the values (see
+ // ChunkInputStream#validateChunk for an example on the client read path).
+ // There is no other value we can use to sort these checksums, so we assume the stored proto has them in the
+ // correct order.
+ ContainerProtos.ChunkMerkleTree chunkTreeProto = chunkTree.toProto();
+ blockTreeBuilder.addChunkMerkleTree(chunkTreeProto);
+ blockChecksumBuffer.putLong(chunkTreeProto.getChunkChecksum());
+ }
+ blockChecksumBuffer.flip();
+ checksumImpl.update(blockChecksumBuffer);
+
+ return blockTreeBuilder
+ .setBlockID(blockID)
+ .setBlockChecksum(checksumImpl.getValue())
+ .build();
+ }
+ }
+
+ /**
+ * Represents a merkle tree for a single chunk within a container.
+ * Each chunk has multiple checksums within it at each "bytesPerChecksum" interval.
+ * This class computes one checksum for the whole chunk by aggregating these.
+ */
+ private static class ChunkMerkleTree {
+ private final ChunkInfo chunk;
+
+ ChunkMerkleTree(ChunkInfo chunk) {
+ this.chunk = chunk;
+ }
+
+ /**
+ * Computes a single hash for this ChunkInfo object. All chunk level checksum computation happens within this
+ * method.
+ *
+ * @return A complete protobuf representation of this chunk as a leaf in the container merkle tree.
+ */
+ public ContainerProtos.ChunkMerkleTree toProto() {
+ ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
+ for (ByteString checksum: chunk.getChecksumData().getChecksums()) {
+ checksumImpl.update(checksum.asReadOnlyByteBuffer());
+ }
+
+ return ContainerProtos.ChunkMerkleTree.newBuilder()
+ .setOffset(chunk.getOffset())
+ .setLength(chunk.getLen())
+ .setChunkChecksum(checksumImpl.getValue())
+ .build();
+ }
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/package-info.java
new file mode 100644
index 000000000000..9dfdc88bf1ec
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.checksum;
+/**
+ * This package contains classes handling container level checksums.
+ */
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
index a8b0d8cfa4bc..28bbb17aa8f1 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
@@ -74,6 +74,7 @@ public class DatanodeConfiguration extends ReconfigurableConfig {
"hdds.datanode.wait.on.all.followers";
public static final String CONTAINER_SCHEMA_V3_ENABLED =
"hdds.datanode.container.schema.v3.enabled";
+ public static final String CONTAINER_CHECKSUM_LOCK_STRIPES_KEY = "hdds.datanode.container.checksum.lock.stripes";
static final boolean CHUNK_DATA_VALIDATION_CHECK_DEFAULT = false;
@@ -109,6 +110,7 @@ public class DatanodeConfiguration extends ReconfigurableConfig {
"hdds.datanode.rocksdb.delete_obsolete_files_period";
public static final Boolean
OZONE_DATANODE_CHECK_EMPTY_CONTAINER_DIR_ON_DELETE_DEFAULT = false;
+ public static final int CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT = 127;
/**
* Number of threads per volume that Datanode will use for chunk read.
@@ -550,6 +552,21 @@ public void setWaitOnAllFollowers(boolean val) {
private boolean bCheckEmptyContainerDir =
OZONE_DATANODE_CHECK_EMPTY_CONTAINER_DIR_ON_DELETE_DEFAULT;
+ /**
+ * Whether to check container directory or not to determine
+ * container is empty.
+ */
+ @Config(key = "container.checksum.lock.stripes",
+ type = ConfigType.INT,
+ defaultValue = "127",
+ tags = { DATANODE },
+ description = "The number of lock stripes used to coordinate modifications to container checksum information. " +
+ "This information is only updated after a container is closed and does not affect the data read or write" +
+ " path. Each container in the datanode will be mapped to one lock which will only be held while its " +
+ "checksum information is updated."
+ )
+ private int containerChecksumLockStripes = CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT;
+
@PostConstruct
public void validate() {
if (containerDeleteThreads < 1) {
@@ -683,6 +700,12 @@ public void validate() {
rocksdbDeleteObsoleteFilesPeriod =
ROCKSDB_DELETE_OBSOLETE_FILES_PERIOD_MICRO_SECONDS_DEFAULT;
}
+
+ if (containerChecksumLockStripes < 1) {
+ LOG.warn("{} must be at least 1. Defaulting to {}", CONTAINER_CHECKSUM_LOCK_STRIPES_KEY,
+ CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT);
+ containerChecksumLockStripes = CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT;
+ }
}
public void setContainerDeleteThreads(int containerDeleteThreads) {
@@ -910,4 +933,8 @@ public int getAutoCompactionSmallSstFileNum() {
public void setAutoCompactionSmallSstFileNum(int num) {
this.autoCompactionSmallSstFileNum = num;
}
+
+ public int getContainerChecksumLockStripes() {
+ return containerChecksumLockStripes;
+ }
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
new file mode 100644
index 000000000000..767eed8a73d3
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.checksum;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeSet;
+
+import static org.apache.hadoop.ozone.container.checksum.TestContainerMerkleTree.assertTreesSortedAndMatch;
+import static org.apache.hadoop.ozone.container.checksum.TestContainerMerkleTree.buildChunk;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestContainerChecksumTreeManager {
+
+ private static final long CONTAINER_ID = 1L;
+ @TempDir
+ private File testDir;
+ private KeyValueContainerData container;
+ private File checksumFile;
+ private ContainerChecksumTreeManager checksumManager;
+
+ @BeforeEach
+ public void init() {
+ container = mock(KeyValueContainerData.class);
+ when(container.getContainerID()).thenReturn(CONTAINER_ID);
+ when(container.getMetadataPath()).thenReturn(testDir.getAbsolutePath());
+ checksumFile = new File(testDir, CONTAINER_ID + ".tree");
+ checksumManager = new ContainerChecksumTreeManager(new DatanodeConfiguration());
+ }
+
+ @Test
+ public void testWriteEmptyTreeToFile() throws Exception {
+ checksumManager.writeContainerDataTree(container, new ContainerMerkleTree());
+ ContainerProtos.ContainerChecksumInfo checksumInfo = readFile();
+
+ assertEquals(CONTAINER_ID, checksumInfo.getContainerID());
+ assertTrue(checksumInfo.getDeletedBlocksList().isEmpty());
+ ContainerProtos.ContainerMerkleTree treeProto = checksumInfo.getContainerMerkleTree();
+ assertEquals(0, treeProto.getDataChecksum());
+ assertTrue(treeProto.getBlockMerkleTreeList().isEmpty());
+ }
+
+ @Test
+ public void testWriteEmptyBlockListToFile() throws Exception {
+ checksumManager.markBlocksAsDeleted(container, new TreeSet<>());
+ ContainerProtos.ContainerChecksumInfo checksumInfo = readFile();
+
+ assertEquals(CONTAINER_ID, checksumInfo.getContainerID());
+ assertTrue(checksumInfo.getDeletedBlocksList().isEmpty());
+ ContainerProtos.ContainerMerkleTree treeProto = checksumInfo.getContainerMerkleTree();
+ assertEquals(0, treeProto.getDataChecksum());
+ assertTrue(treeProto.getBlockMerkleTreeList().isEmpty());
+ }
+
+ @Test
+ public void testWriteOnlyTreeToFile() throws Exception {
+ ContainerMerkleTree tree = buildTestTree();
+ checksumManager.writeContainerDataTree(container, tree);
+
+ ContainerProtos.ContainerChecksumInfo checksumInfo = readFile();
+
+ assertEquals(CONTAINER_ID, checksumInfo.getContainerID());
+ assertTrue(checksumInfo.getDeletedBlocksList().isEmpty());
+ // TestContainerMerkleTree verifies that going from ContainerMerkleTree to its proto is consistent.
+ // Therefore, we can use the proto version of our expected tree to check what was written to the file.
+ assertTreesSortedAndMatch(tree.toProto(), checksumInfo.getContainerMerkleTree());
+ }
+
+ @Test
+ public void testWriteOnlyDeletedBlocksToFile() throws Exception {
+ List expectedBlocksToDelete = Arrays.asList(1L, 2L, 3L);
+ checksumManager.markBlocksAsDeleted(container, new TreeSet<>(expectedBlocksToDelete));
+
+ ContainerProtos.ContainerChecksumInfo checksumInfo = readFile();
+
+ assertEquals(CONTAINER_ID, checksumInfo.getContainerID());
+ assertEquals(expectedBlocksToDelete, checksumInfo.getDeletedBlocksList());
+ ContainerProtos.ContainerMerkleTree treeProto = checksumInfo.getContainerMerkleTree();
+ assertEquals(0, treeProto.getDataChecksum());
+ assertTrue(treeProto.getBlockMerkleTreeList().isEmpty());
+ }
+
+ @Test
+ public void testDeletedBlocksPreservedOnTreeWrite() throws Exception {
+ List expectedBlocksToDelete = Arrays.asList(1L, 2L, 3L);
+ checksumManager.markBlocksAsDeleted(container, new TreeSet<>(expectedBlocksToDelete));
+ ContainerMerkleTree tree = buildTestTree();
+ checksumManager.writeContainerDataTree(container, tree);
+
+ ContainerProtos.ContainerChecksumInfo checksumInfo = readFile();
+
+ assertEquals(CONTAINER_ID, checksumInfo.getContainerID());
+ assertEquals(expectedBlocksToDelete, checksumInfo.getDeletedBlocksList());
+ assertTreesSortedAndMatch(tree.toProto(), checksumInfo.getContainerMerkleTree());
+ }
+
+ @Test
+ public void testTreePreservedOnDeletedBlocksWrite() throws Exception {
+ ContainerMerkleTree tree = buildTestTree();
+ checksumManager.writeContainerDataTree(container, tree);
+ List expectedBlocksToDelete = Arrays.asList(1L, 2L, 3L);
+ checksumManager.markBlocksAsDeleted(container, new TreeSet<>(expectedBlocksToDelete));
+
+ ContainerProtos.ContainerChecksumInfo checksumInfo = readFile();
+
+ assertEquals(CONTAINER_ID, checksumInfo.getContainerID());
+ assertEquals(expectedBlocksToDelete, checksumInfo.getDeletedBlocksList());
+ assertTreesSortedAndMatch(tree.toProto(), checksumInfo.getContainerMerkleTree());
+ }
+
+ @Test
+ public void testChecksumTreeFilePath() {
+ assertEquals(checksumFile.getAbsolutePath(), checksumManager.getContainerChecksumFile(container).getAbsolutePath());
+ }
+
+ private ContainerMerkleTree buildTestTree() throws Exception {
+ final long blockID1 = 1;
+ final long blockID2 = 2;
+ final long blockID3 = 3;
+ ChunkInfo b1c1 = buildChunk(0, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+ ChunkInfo b1c2 = buildChunk(1, ByteBuffer.wrap(new byte[]{4, 5, 6}));
+ ChunkInfo b2c1 = buildChunk(0, ByteBuffer.wrap(new byte[]{7, 8, 9}));
+ ChunkInfo b2c2 = buildChunk(1, ByteBuffer.wrap(new byte[]{12, 11, 10}));
+ ChunkInfo b3c1 = buildChunk(0, ByteBuffer.wrap(new byte[]{13, 14, 15}));
+ ChunkInfo b3c2 = buildChunk(1, ByteBuffer.wrap(new byte[]{16, 17, 18}));
+
+ ContainerMerkleTree tree = new ContainerMerkleTree();
+ tree.addChunks(blockID1, Arrays.asList(b1c1, b1c2));
+ tree.addChunks(blockID2, Arrays.asList(b2c1, b2c2));
+ tree.addChunks(blockID3, Arrays.asList(b3c1, b3c2));
+
+ return tree;
+ }
+
+ private ContainerProtos.ContainerChecksumInfo readFile() throws IOException {
+ try (FileInputStream inStream = new FileInputStream(checksumFile)) {
+ return ContainerProtos.ContainerChecksumInfo.parseFrom(inStream);
+ }
+ }
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerMerkleTree.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerMerkleTree.java
new file mode 100644
index 000000000000..a93c4f170236
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerMerkleTree.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.checksum;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.zip.CRC32;
+
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestContainerMerkleTree {
+ private static final long CHUNK_SIZE = (long) new OzoneConfiguration().getStorageSize(
+ ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT, StorageUnit.BYTES);
+ private static final int BYTES_PER_CHECKSUM = new OzoneClientConfig().getBytesPerChecksum();
+
+ @Test
+ public void testBuildEmptyTree() {
+ ContainerMerkleTree tree = new ContainerMerkleTree();
+ ContainerProtos.ContainerMerkleTree treeProto = tree.toProto();
+ assertEquals(0, treeProto.getDataChecksum());
+ assertEquals(0, treeProto.getBlockMerkleTreeCount());
+ }
+
+ @Test
+ public void testBuildOneChunkTree() throws Exception {
+ // Seed the expected and actual trees with the same chunk.
+ final long blockID = 1;
+ ChunkInfo chunk = buildChunk(0, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+
+ // Build the expected tree proto using the test code.
+ ContainerProtos.ChunkMerkleTree chunkTree = buildExpectedChunkTree(chunk);
+ ContainerProtos.BlockMerkleTree blockTree = buildExpectedBlockTree(blockID,
+ Collections.singletonList(chunkTree));
+ ContainerProtos.ContainerMerkleTree expectedTree = buildExpectedContainerTree(Collections.singletonList(blockTree));
+
+ // Use the ContainerMerkleTree to build the same tree.
+ ContainerMerkleTree actualTree = new ContainerMerkleTree();
+ actualTree.addChunks(blockID, Collections.singletonList(chunk));
+
+ // Ensure the trees match.
+ ContainerProtos.ContainerMerkleTree actualTreeProto = actualTree.toProto();
+ assertTreesSortedAndMatch(expectedTree, actualTreeProto);
+
+ // Do some manual verification of the generated tree as well.
+ assertNotEquals(0, actualTreeProto.getDataChecksum());
+ assertEquals(1, actualTreeProto.getBlockMerkleTreeCount());
+
+ ContainerProtos.BlockMerkleTree actualBlockTree = actualTreeProto.getBlockMerkleTree(0);
+ assertEquals(1, actualBlockTree.getBlockID());
+ assertEquals(1, actualBlockTree.getChunkMerkleTreeCount());
+ assertNotEquals(0, actualBlockTree.getBlockChecksum());
+
+ ContainerProtos.ChunkMerkleTree actualChunkTree = actualBlockTree.getChunkMerkleTree(0);
+ assertEquals(0, actualChunkTree.getOffset());
+ assertEquals(CHUNK_SIZE, actualChunkTree.getLength());
+ assertNotEquals(0, actualChunkTree.getChunkChecksum());
+ }
+
+ @Test
+ public void testBuildTreeWithMissingChunks() throws Exception {
+ // These chunks will be used to seed both the expected and actual trees.
+ final long blockID = 1;
+ ChunkInfo chunk1 = buildChunk(0, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+ // Chunk 2 is missing.
+ ChunkInfo chunk3 = buildChunk(2, ByteBuffer.wrap(new byte[]{4, 5, 6}));
+
+ // Build the expected tree proto using the test code.
+ ContainerProtos.BlockMerkleTree blockTree = buildExpectedBlockTree(blockID,
+ Arrays.asList(buildExpectedChunkTree(chunk1), buildExpectedChunkTree(chunk3)));
+ ContainerProtos.ContainerMerkleTree expectedTree = buildExpectedContainerTree(Collections.singletonList(blockTree));
+
+ // Use the ContainerMerkleTree to build the same tree.
+ ContainerMerkleTree actualTree = new ContainerMerkleTree();
+ actualTree.addChunks(blockID, Arrays.asList(chunk1, chunk3));
+
+ // Ensure the trees match.
+ ContainerProtos.ContainerMerkleTree actualTreeProto = actualTree.toProto();
+ assertTreesSortedAndMatch(expectedTree, actualTreeProto);
+ }
+
+ /**
+ * A container is a set of blocks. Make sure the tree implementation is not dependent on continuity of block IDs.
+ */
+ @Test
+ public void testBuildTreeWithNonContiguousBlockIDs() throws Exception {
+ // Seed the expected and actual trees with the same chunks.
+ final long blockID1 = 1;
+ final long blockID3 = 3;
+ ChunkInfo b1c1 = buildChunk(0, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+ ChunkInfo b1c2 = buildChunk(1, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+ ChunkInfo b3c1 = buildChunk(0, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+ ChunkInfo b3c2 = buildChunk(1, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+
+ // Build the expected tree proto using the test code.
+ ContainerProtos.BlockMerkleTree blockTree1 = buildExpectedBlockTree(blockID1,
+ Arrays.asList(buildExpectedChunkTree(b1c1), buildExpectedChunkTree(b1c2)));
+ ContainerProtos.BlockMerkleTree blockTree3 = buildExpectedBlockTree(blockID3,
+ Arrays.asList(buildExpectedChunkTree(b3c1), buildExpectedChunkTree(b3c2)));
+ ContainerProtos.ContainerMerkleTree expectedTree = buildExpectedContainerTree(
+ Arrays.asList(blockTree1, blockTree3));
+
+ // Use the ContainerMerkleTree to build the same tree.
+ // Add blocks and chunks out of order to test sorting.
+ ContainerMerkleTree actualTree = new ContainerMerkleTree();
+ actualTree.addChunks(blockID3, Arrays.asList(b3c2, b3c1));
+ actualTree.addChunks(blockID1, Arrays.asList(b1c1, b1c2));
+
+ // Ensure the trees match.
+ ContainerProtos.ContainerMerkleTree actualTreeProto = actualTree.toProto();
+ assertTreesSortedAndMatch(expectedTree, actualTreeProto);
+ }
+
+ @Test
+ public void testAppendToBlocksWhileBuilding() throws Exception {
+ // Seed the expected and actual trees with the same chunks.
+ final long blockID1 = 1;
+ final long blockID2 = 2;
+ final long blockID3 = 3;
+ ChunkInfo b1c1 = buildChunk(0, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+ ChunkInfo b1c2 = buildChunk(1, ByteBuffer.wrap(new byte[]{1, 2}));
+ ChunkInfo b1c3 = buildChunk(2, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+ ChunkInfo b2c1 = buildChunk(0, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+ ChunkInfo b2c2 = buildChunk(1, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+ ChunkInfo b3c1 = buildChunk(0, ByteBuffer.wrap(new byte[]{1}));
+ ChunkInfo b3c2 = buildChunk(1, ByteBuffer.wrap(new byte[]{2, 3, 4}));
+
+ // Build the expected tree proto using the test code.
+ ContainerProtos.BlockMerkleTree blockTree1 = buildExpectedBlockTree(blockID1,
+ Arrays.asList(buildExpectedChunkTree(b1c1), buildExpectedChunkTree(b1c2), buildExpectedChunkTree(b1c3)));
+ ContainerProtos.BlockMerkleTree blockTree2 = buildExpectedBlockTree(blockID2,
+ Arrays.asList(buildExpectedChunkTree(b2c1), buildExpectedChunkTree(b2c2)));
+ ContainerProtos.BlockMerkleTree blockTree3 = buildExpectedBlockTree(blockID3,
+ Arrays.asList(buildExpectedChunkTree(b3c1), buildExpectedChunkTree(b3c2)));
+ ContainerProtos.ContainerMerkleTree expectedTree = buildExpectedContainerTree(
+ Arrays.asList(blockTree1, blockTree2, blockTree3));
+
+ // Use the ContainerMerkleTree to build the same tree.
+ // Test building by adding chunks to the blocks individually and out of order.
+ ContainerMerkleTree actualTree = new ContainerMerkleTree();
+ // Add all of block 2 first.
+ actualTree.addChunks(blockID2, Arrays.asList(b2c1, b2c2));
+ // Then add block 1 in multiple steps wth chunks out of order.
+ actualTree.addChunks(blockID1, Collections.singletonList(b1c2));
+ actualTree.addChunks(blockID1, Arrays.asList(b1c3, b1c1));
+ // Add a duplicate chunk to block 3. It should overwrite the existing one.
+ actualTree.addChunks(blockID3, Arrays.asList(b3c1, b3c2));
+ actualTree.addChunks(blockID3, Collections.singletonList(b3c2));
+
+ // Ensure the trees match.
+ ContainerProtos.ContainerMerkleTree actualTreeProto = actualTree.toProto();
+ assertTreesSortedAndMatch(expectedTree, actualTreeProto);
+ }
+
+ public static void assertTreesSortedAndMatch(ContainerProtos.ContainerMerkleTree expectedTree,
+ ContainerProtos.ContainerMerkleTree actualTree) {
+ assertEquals(expectedTree.getDataChecksum(), actualTree.getDataChecksum());
+ assertEquals(expectedTree.getBlockMerkleTreeCount(), actualTree.getBlockMerkleTreeCount());
+
+ long prevBlockID = -1;
+ for (int blockIndex = 0; blockIndex < expectedTree.getBlockMerkleTreeCount(); blockIndex++) {
+ ContainerProtos.BlockMerkleTree expectedBlockTree = expectedTree.getBlockMerkleTree(blockIndex);
+ ContainerProtos.BlockMerkleTree actualBlockTree = actualTree.getBlockMerkleTree(blockIndex);
+
+ // Blocks should be sorted by block ID.
+ long currentBlockID = actualBlockTree.getBlockID();
+ assertTrue(prevBlockID < currentBlockID);
+ prevBlockID = currentBlockID;
+
+ assertEquals(expectedBlockTree.getBlockID(), actualBlockTree.getBlockID());
+ assertEquals(expectedBlockTree.getBlockChecksum(), actualBlockTree.getBlockChecksum());
+
+ long prevChunkOffset = -1;
+ for (int chunkIndex = 0; chunkIndex < expectedBlockTree.getChunkMerkleTreeCount(); chunkIndex++) {
+ ContainerProtos.ChunkMerkleTree expectedChunkTree = expectedBlockTree.getChunkMerkleTree(chunkIndex);
+ ContainerProtos.ChunkMerkleTree actualChunkTree = actualBlockTree.getChunkMerkleTree(chunkIndex);
+
+ // Chunks should be sorted by offset.
+ long currentChunkOffset = actualChunkTree.getOffset();
+ assertTrue(prevChunkOffset < currentChunkOffset);
+ prevChunkOffset = currentChunkOffset;
+
+ assertEquals(expectedChunkTree.getOffset(), actualChunkTree.getOffset());
+ assertEquals(expectedChunkTree.getLength(), actualChunkTree.getLength());
+ assertEquals(expectedChunkTree.getChunkChecksum(), actualChunkTree.getChunkChecksum());
+ }
+ }
+ }
+
+ private ContainerProtos.ContainerMerkleTree buildExpectedContainerTree(List blocks) {
+ return ContainerProtos.ContainerMerkleTree.newBuilder()
+ .addAllBlockMerkleTree(blocks)
+ .setDataChecksum(computeExpectedChecksum(
+ blocks.stream()
+ .map(ContainerProtos.BlockMerkleTree::getBlockChecksum)
+ .collect(Collectors.toList())))
+ .build();
+ }
+
+ private ContainerProtos.BlockMerkleTree buildExpectedBlockTree(long blockID,
+ List chunks) {
+ return ContainerProtos.BlockMerkleTree.newBuilder()
+ .setBlockID(blockID)
+ .setBlockChecksum(computeExpectedChecksum(
+ chunks.stream()
+ .map(ContainerProtos.ChunkMerkleTree::getChunkChecksum)
+ .collect(Collectors.toList())))
+ .addAllChunkMerkleTree(chunks)
+ .build();
+ }
+
+ private ContainerProtos.ChunkMerkleTree buildExpectedChunkTree(ChunkInfo chunk) {
+ return ContainerProtos.ChunkMerkleTree.newBuilder()
+ .setOffset(chunk.getOffset())
+ .setLength(chunk.getLen())
+ .setChunkChecksum(computeExpectedChunkChecksum(chunk.getChecksumData().getChecksums()))
+ .build();
+ }
+
+ /**
+ * Builds a ChunkInfo object using the provided information. No new checksums are calculated, so this can be used
+ * as either the leaves of pre-computed merkle trees that serve as expected values, or as building blocks to pass
+ * to ContainerMerkleTree to have it build the whole tree from this information.
+ *
+ * @param indexInBlock Which chunk number within a block this is. The chunk's offset is automatically calculated
+ * from this based on a fixed length.
+ * @param chunkChecksums The checksums within the chunk. Each is assumed to apply to a fixed value
+ * "bytesPerChecksum" amount of data and are assumed to be contiguous.
+ * @return The ChunkInfo proto object built from this information.
+ */
+ public static ChunkInfo buildChunk(int indexInBlock, ByteBuffer... chunkChecksums) throws IOException {
+ // Each chunk checksum is added under the same ChecksumData object.
+ ContainerProtos.ChecksumData checksumData = ContainerProtos.ChecksumData.newBuilder()
+ .setType(ContainerProtos.ChecksumType.CRC32)
+ .setBytesPerChecksum(BYTES_PER_CHECKSUM)
+ .addAllChecksums(Arrays.stream(chunkChecksums)
+ .map(ByteString::copyFrom)
+ .collect(Collectors.toList()))
+ .build();
+
+ return ChunkInfo.getFromProtoBuf(
+ ContainerProtos.ChunkInfo.newBuilder()
+ .setChecksumData(checksumData)
+ .setChunkName("chunk")
+ .setOffset(indexInBlock * CHUNK_SIZE)
+ .setLen(CHUNK_SIZE)
+ .build());
+ }
+
+ private long computeExpectedChecksum(List checksums) {
+ CRC32 crc32 = new CRC32();
+ ByteBuffer longBuffer = ByteBuffer.allocate(Long.BYTES * checksums.size());
+ checksums.forEach(longBuffer::putLong);
+ longBuffer.flip();
+ crc32.update(longBuffer);
+ return crc32.getValue();
+ }
+
+ private long computeExpectedChunkChecksum(List checksums) {
+ CRC32 crc32 = new CRC32();
+ checksums.forEach(b -> crc32.update(b.asReadOnlyByteBuffer()));
+ return crc32.getValue();
+ }
+}
diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 7755b993caea..833159c84ec1 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -527,6 +527,30 @@ message SendContainerRequest {
message SendContainerResponse {
}
+// Each chunk contains multiple checksums. This message aggregates them into one checksum for the whole chunk.
+message ChunkMerkleTree {
+ optional int64 offset = 1;
+ optional int64 length = 2;
+ optional int64 chunkChecksum = 3;
+}
+
+message BlockMerkleTree {
+ optional int64 blockID = 1;
+ optional int64 blockChecksum = 2;
+ repeated ChunkMerkleTree chunkMerkleTree = 3;
+}
+
+message ContainerMerkleTree {
+ optional int64 dataChecksum = 1;
+ repeated BlockMerkleTree blockMerkleTree = 2;
+}
+
+message ContainerChecksumInfo {
+ optional int64 containerID = 1;
+ optional ContainerMerkleTree containerMerkleTree = 2;
+ repeated int64 deletedBlocks = 3;
+}
+
service XceiverClientProtocolService {
// A client-to-datanode RPC to send container commands
rpc send(stream ContainerCommandRequestProto) returns