Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1efd557
Dedup file existence method
errose28 Jun 4, 2025
72bd717
Remove unnecessary methods
errose28 Jun 4, 2025
75fbbf9
Clarify when a container is missing the data checksum
errose28 Jun 4, 2025
f200716
Checkstyle
errose28 Jun 4, 2025
ab961d7
Add test that metadata checksum matches data checksum when container …
errose28 Jun 9, 2025
83499b7
Update some failing tests
errose28 Jun 9, 2025
b10a4f6
Add warn log when generating diff and skipping peer's unhealthy chunks
errose28 Jun 10, 2025
87c81a5
Add todos during review
errose28 Jun 12, 2025
1c5e913
Add container ID missed in checksum update log message
errose28 Jun 12, 2025
fadd33d
Address review comments
errose28 Jun 12, 2025
474b0d6
closing container should not have checksum generated yet
errose28 Jun 13, 2025
7d110c4
Fix logging condition
errose28 Jun 16, 2025
e9fdc30
Fix tests with needsChecksum method. Logging tests still fail
errose28 Jun 17, 2025
d713db4
Remove logging checks, tests pass
errose28 Jun 17, 2025
a76dd0a
Checkstyle
errose28 Jun 17, 2025
d7af72f
Undo old changes
errose28 Jun 17, 2025
5c0d078
Update comments based on diff
errose28 Jun 17, 2025
b248828
Merge branch 'master' into HDDS-13083-handle-empty-mt
errose28 Jun 26, 2025
2fd102b
Tag unrelated flaky test
errose28 Jun 27, 2025
b776746
Update unit test
errose28 Jul 22, 2025
1a4c40f
Update based on comment
errose28 Jul 22, 2025
6b4188f
Merge branch 'master' into HDDS-13083-handle-empty-mt
errose28 Jul 22, 2025
6ad547f
Remove tag of fixed flaky test
errose28 Jul 22, 2025
ad81d2d
Remove flaky tag
errose28 Jul 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@
import java.util.TreeSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.utils.SimpleStriped;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
Expand Down Expand Up @@ -94,15 +94,7 @@ public ContainerProtos.ContainerChecksumInfo writeContainerDataTree(ContainerDat
Lock writeLock = getLock(containerID);
writeLock.lock();
try {
ContainerProtos.ContainerChecksumInfo.Builder checksumInfoBuilder = null;
try {
// If the file is not present, we will create the data for the first time. This happens under a write lock.
checksumInfoBuilder = readBuilder(data).orElse(ContainerProtos.ContainerChecksumInfo.newBuilder());
} catch (IOException ex) {
LOG.error("Failed to read container checksum tree file for container {}. Creating a new instance.",
containerID, ex);
checksumInfoBuilder = ContainerProtos.ContainerChecksumInfo.newBuilder();
}
ContainerProtos.ContainerChecksumInfo.Builder checksumInfoBuilder = readOrCreate(data).toBuilder();

ContainerProtos.ContainerMerkleTree treeProto = captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(),
tree::toProto);
Expand All @@ -129,16 +121,7 @@ public void markBlocksAsDeleted(KeyValueContainerData data, Collection<Long> del
Lock writeLock = getLock(containerID);
writeLock.lock();
try {
ContainerProtos.ContainerChecksumInfo.Builder checksumInfoBuilder = null;
try {
// If the file is not present, we will create the data for the first time. This happens under a write lock.
checksumInfoBuilder = readBuilder(data)
.orElse(ContainerProtos.ContainerChecksumInfo.newBuilder());
} catch (IOException ex) {
LOG.error("Failed to read container checksum tree file for container {}. Overwriting it with a new instance.",
data.getContainerID(), ex);
checksumInfoBuilder = ContainerProtos.ContainerChecksumInfo.newBuilder();
}
ContainerProtos.ContainerChecksumInfo.Builder checksumInfoBuilder = readOrCreate(data).toBuilder();

// Although the persisted block list should already be sorted, we will sort it here to make sure.
// This will automatically fix any bugs in the persisted order that may show up.
Expand Down Expand Up @@ -179,7 +162,7 @@ public ContainerDiffReport diff(ContainerProtos.ContainerChecksumInfo thisChecks
ContainerProtos.ContainerChecksumInfo peerChecksumInfo) throws
StorageContainerException {

ContainerDiffReport report = new ContainerDiffReport();
ContainerDiffReport report = new ContainerDiffReport(thisChecksumInfo.getContainerID());
try {
captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> {
Preconditions.assertNotNull(thisChecksumInfo, "Datanode's checksum info is null.");
Expand Down Expand Up @@ -280,6 +263,8 @@ private void compareBlockMerkleTree(ContainerProtos.BlockMerkleTree thisBlockMer
List<ContainerProtos.ChunkMerkleTree> thisChunkMerkleTreeList = thisBlockMerkleTree.getChunkMerkleTreeList();
List<ContainerProtos.ChunkMerkleTree> peerChunkMerkleTreeList = peerBlockMerkleTree.getChunkMerkleTreeList();
int thisIdx = 0, peerIdx = 0;
long containerID = report.getContainerID();
long blockID = thisBlockMerkleTree.getBlockID();

// Step 1: Process both lists while elements are present in both
while (thisIdx < thisChunkMerkleTreeList.size() && peerIdx < peerChunkMerkleTreeList.size()) {
Expand All @@ -293,8 +278,8 @@ private void compareBlockMerkleTree(ContainerProtos.BlockMerkleTree thisBlockMer
// thisTree = Healthy, peerTree = unhealthy -> Do nothing as thisTree is healthy.
// thisTree = Unhealthy, peerTree = Unhealthy -> Do Nothing as both are corrupt.
if (thisChunkMerkleTree.getDataChecksum() != peerChunkMerkleTree.getDataChecksum() &&
!thisChunkMerkleTree.getChecksumMatches() && peerChunkMerkleTree.getChecksumMatches()) {
report.addCorruptChunk(peerBlockMerkleTree.getBlockID(), peerChunkMerkleTree);
!thisChunkMerkleTree.getChecksumMatches()) {
reportChunkIfHealthy(containerID, blockID, peerChunkMerkleTree, report::addCorruptChunk);
}
thisIdx++;
peerIdx++;
Expand All @@ -304,30 +289,33 @@ private void compareBlockMerkleTree(ContainerProtos.BlockMerkleTree thisBlockMer
thisIdx++;
} else {
// Peer chunk's offset is smaller; record missing chunk and advance peerIdx
report.addMissingChunk(peerBlockMerkleTree.getBlockID(), peerChunkMerkleTree);
reportChunkIfHealthy(containerID, blockID, peerChunkMerkleTree, report::addMissingChunk);
peerIdx++;
}
}

// Step 2: Process remaining chunks in the peer list
while (peerIdx < peerChunkMerkleTreeList.size()) {
report.addMissingChunk(peerBlockMerkleTree.getBlockID(), peerChunkMerkleTreeList.get(peerIdx));
reportChunkIfHealthy(containerID, blockID, peerChunkMerkleTreeList.get(peerIdx), report::addMissingChunk);
peerIdx++;
}

// If we have remaining chunks in thisBlockMerkleTree, we can skip these chunks. The peers will pick these
// chunks from us when they reconcile.
}

public static long getDataChecksum(ContainerProtos.ContainerChecksumInfo checksumInfo) {
return checksumInfo.getContainerMerkleTree().getDataChecksum();
private void reportChunkIfHealthy(long containerID, long blockID, ContainerProtos.ChunkMerkleTree peerTree,
BiConsumer<Long, ContainerProtos.ChunkMerkleTree> addToReport) {
if (peerTree.getChecksumMatches()) {
addToReport.accept(blockID, peerTree);
} else {
LOG.warn("Skipping chunk at offset {} in block {} of container {} since peer reported it as " +
"unhealthy.", peerTree.getOffset(), blockID, containerID);
}
}

/**
* Returns whether the container checksum tree file for the specified container exists without deserializing it.
*/
public static boolean hasContainerChecksumFile(ContainerData data) {
return getContainerChecksumFile(data).exists();
public static long getDataChecksum(ContainerProtos.ContainerChecksumInfo checksumInfo) {
return checksumInfo.getContainerMerkleTree().getDataChecksum();
}

/**
Expand All @@ -348,21 +336,34 @@ private Lock getLock(long containerID) {
}

/**
* Reads the checksum info of the specified container. If the tree file with the information does not exist, an empty
* instance is returned.
* Callers are not required to hold a lock while calling this since writes are done to a tmp file and atomically
* swapped into place.
*/
public Optional<ContainerProtos.ContainerChecksumInfo> read(ContainerData data) throws IOException {
public ContainerProtos.ContainerChecksumInfo read(ContainerData data) throws IOException {
try {
return captureLatencyNs(metrics.getReadContainerMerkleTreeLatencyNS(), () -> readChecksumInfo(data));
return captureLatencyNs(metrics.getReadContainerMerkleTreeLatencyNS(), () ->
readChecksumInfo(data).orElse(ContainerProtos.ContainerChecksumInfo.newBuilder().build()));
} catch (IOException ex) {
metrics.incrementMerkleTreeReadFailures();
throw new IOException(ex);
throw ex;
}
}

private Optional<ContainerProtos.ContainerChecksumInfo.Builder> readBuilder(ContainerData data) throws IOException {
Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo = read(data);
return checksumInfo.map(ContainerProtos.ContainerChecksumInfo::toBuilder);
/**
* Reads the checksum info of the specified container. If the tree file with the information does not exist, or there
* is an exception trying to read the file, an empty instance is returned.
*/
private ContainerProtos.ContainerChecksumInfo readOrCreate(ContainerData data) {
try {
// If the file is not present, we will create the data for the first time. This happens under a write lock.
return read(data);
} catch (IOException ex) {
LOG.error("Failed to read container checksum tree file for container {}. Overwriting it with a new instance.",
data.getContainerID(), ex);
return ContainerProtos.ContainerChecksumInfo.newBuilder().build();
}
}

/**
Expand Down Expand Up @@ -443,10 +444,4 @@ public static Optional<ContainerProtos.ContainerChecksumInfo> readChecksumInfo(C
public ContainerMerkleTreeMetrics getMetrics() {
return this.metrics;
}

public static boolean checksumFileExist(Container<?> container) {
File checksumFile = getContainerChecksumFile(container.getContainerData());
return checksumFile.exists();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,17 @@ public class ContainerDiffReport {
private final List<ContainerProtos.BlockMerkleTree> missingBlocks;
private final Map<Long, List<ContainerProtos.ChunkMerkleTree>> missingChunks;
private final Map<Long, List<ContainerProtos.ChunkMerkleTree>> corruptChunks;
private final long containerID;

public ContainerDiffReport() {
public ContainerDiffReport(long containerID) {
this.missingBlocks = new ArrayList<>();
this.missingChunks = new HashMap<>();
this.corruptChunks = new HashMap<>();
this.containerID = containerID;
}

public long getContainerID() {
return containerID;
}

/**
Expand Down Expand Up @@ -105,7 +111,7 @@ public long getNumMissingBlocks() {

@Override
public String toString() {
return "ContainerDiffReport:" +
return "Diff report for container " + containerID + ":" +
" Missing Blocks: " + getNumMissingBlocks() +
" Missing Chunks: " + getNumMissingChunks() + " chunks from " + missingChunks.size() + " blocks" +
" Corrupt Chunks: " + getNumCorruptChunks() + " chunks from " + corruptChunks.size() + " blocks";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public abstract class ContainerData {

// Checksum of the data within the container.
private long dataChecksum;
private static final long UNSET_DATA_CHECKSUM = -1;

private boolean isEmpty;

Expand Down Expand Up @@ -153,7 +154,7 @@ protected ContainerData(ContainerType type, long containerId,
this.originNodeId = originNodeId;
this.isEmpty = false;
this.checksum = ZERO_CHECKSUM;
this.dataChecksum = 0;
this.dataChecksum = UNSET_DATA_CHECKSUM;
}

protected ContainerData(ContainerData source) {
Expand Down Expand Up @@ -538,13 +539,24 @@ public void computeAndSetContainerFileChecksum(Yaml yaml) throws IOException {
}

public void setDataChecksum(long dataChecksum) {
if (dataChecksum < 0) {
throw new IllegalArgumentException("Data checksum cannot be set to a negative number.");
}
this.dataChecksum = dataChecksum;
}

public long getDataChecksum() {
// UNSET_DATA_CHECKSUM is an internal placeholder, it should not be used outside this class.
if (needsDataChecksum()) {
return 0;
}
return dataChecksum;
}

public boolean needsDataChecksum() {
return dataChecksum == UNSET_DATA_CHECKSUM;
}

/**
* Returns a ProtoBuf Message from ContainerData.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -1368,7 +1367,6 @@ public void markContainerForClose(Container container)
} finally {
container.writeUnlock();
}
updateContainerChecksumFromMetadataIfNeeded(container);
ContainerLogger.logClosing(container.getContainerData());
sendICR(container);
}
Expand All @@ -1383,15 +1381,15 @@ public void updateContainerChecksum(Container container, ContainerMerkleTreeWrit
* Write the merkle tree for this container using the existing checksum metadata only. The data is not read or
* validated by this method, so it is expected to run quickly.
* <p>
* If a checksum file already exists on the disk, this method will do nothing. The existing file would have either
* If a data checksum for the container already exists, this method does nothing. The existing value would have either
* been made from the metadata or data itself so there is no need to recreate it from the metadata. This method
* does not send an ICR with the updated checksum info.
* <p>
*
* @param container The container which will have a tree generated.
*/
private void updateContainerChecksumFromMetadataIfNeeded(Container container) {
if (ContainerChecksumTreeManager.checksumFileExist(container)) {
if (!container.getContainerData().needsDataChecksum()) {
return;
}

Expand Down Expand Up @@ -1435,24 +1433,24 @@ private ContainerProtos.ContainerChecksumInfo updateAndGetContainerChecksum(Cont
// checksum to prevent divergence from what SCM sees in the ICR vs what datanode peers will see when pulling the
// merkle tree.
long originalDataChecksum = containerData.getDataChecksum();
boolean hadDataChecksum = !containerData.needsDataChecksum();
ContainerProtos.ContainerChecksumInfo updateChecksumInfo = checksumManager.writeContainerDataTree(containerData,
treeWriter);
long updatedDataChecksum = updateChecksumInfo.getContainerMerkleTree().getDataChecksum();

if (updatedDataChecksum != originalDataChecksum) {
containerData.setDataChecksum(updatedDataChecksum);
String message =
"Container data checksum updated from " + checksumToString(originalDataChecksum) + " to " +
checksumToString(updatedDataChecksum);
if (sendICR) {
sendICR(container);
}
if (ContainerChecksumTreeManager.hasContainerChecksumFile(containerData)) {

String message = "Container " + containerData.getContainerID() + " data checksum updated from " +
checksumToString(originalDataChecksum) + " to " + checksumToString(updatedDataChecksum);
if (hadDataChecksum) {
LOG.warn(message);
ContainerLogger.logChecksumUpdated(containerData, originalDataChecksum);
} else {
// If this is the first time the scanner has run with the feature to generate a checksum file, don't
// log a warning for the checksum update.
// If this is the first time the checksum is being generated, don't log a warning about updating the checksum.
LOG.debug(message);
}
}
Expand All @@ -1465,7 +1463,6 @@ public void markContainerUnhealthy(Container container, ScanResult reason)
container.writeLock();
long containerID = 0L;
try {
containerID = container.getContainerData().getContainerID();
if (container.getContainerState() == State.UNHEALTHY) {
LOG.debug("Call to mark already unhealthy container {} as unhealthy",
containerID);
Expand Down Expand Up @@ -1572,12 +1569,9 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container<?>
long containerID = containerData.getContainerID();

// Obtain the original checksum info before reconciling with any peers.
Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = checksumManager.read(containerData);
ContainerProtos.ContainerChecksumInfo originalChecksumInfo;
if (optionalChecksumInfo.isPresent()) {
originalChecksumInfo = optionalChecksumInfo.get();
} else {
// Try creating the checksum info from RocksDB metadata if it is not present.
ContainerProtos.ContainerChecksumInfo originalChecksumInfo = checksumManager.read(containerData);
if (!originalChecksumInfo.hasContainerMerkleTree()) {
// Try creating the merkle tree from RocksDB metadata if it is not present.
originalChecksumInfo = updateAndGetContainerChecksumFromMetadata(kvContainer);
}
// This holds our current most up-to-date checksum info that we are using for the container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public static ContainerMerkleTreeWriter buildTestTree(ConfigurationSource conf,
int numCorruptChunks) {

ContainerProtos.ContainerMerkleTree.Builder treeBuilder = originalTree.toProto().toBuilder();
ContainerDiffReport diff = new ContainerDiffReport();
ContainerDiffReport diff = new ContainerDiffReport(1);

introduceMissingBlocks(treeBuilder, numMissingBlocks, diff);
introduceMissingChunks(treeBuilder, numMissingChunks, diff);
Expand Down Expand Up @@ -323,12 +323,12 @@ private static void assertEqualsChunkMerkleTree(List<ContainerProtos.ChunkMerkle
}

/**
* This function checks whether the container checksum file exists.
* This function checks whether the container checksum file exists for a container in a given datanode.
*/
public static boolean containerChecksumFileExists(HddsDatanodeService hddsDatanode, long containerID) {
OzoneContainer ozoneContainer = hddsDatanode.getDatanodeStateMachine().getContainer();
Container<?> container = ozoneContainer.getController().getContainer(containerID);
return ContainerChecksumTreeManager.checksumFileExist(container);
return getContainerChecksumFile(container.getContainerData()).exists();
}

public static void writeContainerDataTreeProto(ContainerData data, ContainerProtos.ContainerMerkleTree tree)
Expand Down
Loading