From b755e48032161bb795b656a922abd04af97b94ea Mon Sep 17 00:00:00 2001 From: Aswin Shakil Balasubramanian Date: Tue, 5 Aug 2025 22:39:44 -0700 Subject: [PATCH 1/5] HDDS-13519. Reconciliation should continue if a peer datanode is unreachable --- .../container/keyvalue/KeyValueHandler.java | 18 ++++-- ...tainerReconciliationWithMockDatanodes.java | 61 +++++++++++++++++++ 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index b9c80ce213b2..7b9ef9e7c1e7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1594,11 +1594,19 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container containerID, peer, checksumToString(ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo))); // Data checksum updated after each peer reconciles. long start = Instant.now().toEpochMilli(); - ContainerProtos.ContainerChecksumInfo peerChecksumInfo = dnClient.getContainerChecksumInfo( - containerID, peer); - if (peerChecksumInfo == null) { - LOG.warn("Cannot reconcile container {} with peer {} which has not yet generated a checksum", - containerID, peer); + ContainerProtos.ContainerChecksumInfo peerChecksumInfo; + + try { + // Data checksum updated after each peer reconciles. + peerChecksumInfo = dnClient.getContainerChecksumInfo(containerID, peer); + if (peerChecksumInfo == null) { + LOG.warn("Cannot reconcile container {} with peer {} which has not yet generated a checksum", + containerID, peer); + continue; + } + } catch (Exception e) { + LOG.error("Failed to connect to peer {} for container {} reconciliation. Skipping to next peer.", + peer, containerID, e); continue; } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java index d567e209d420..b0409fae21e5 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java @@ -89,6 +89,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -228,6 +229,66 @@ public void testContainerReconciliation(int numBlocksToDelete, int numChunksToCo assertEquals(healthyDataChecksum, repairedDataChecksum); } + @Test + public void testContainerReconciliationWithPeerFailure() throws Exception { + LOG.info("Testing container reconciliation with peer failure for container {}", CONTAINER_ID); + // Introduce corruption in the first datanode + MockDatanode corruptedNode = datanodes.get(0); + MockDatanode healthyNode1 = datanodes.get(1); + MockDatanode healthyNode2 = datanodes.get(2); + corruptedNode.introduceCorruption(CONTAINER_ID, 1, 1, false); + + // Use synchronous on-demand scans to re-build the merkle trees after corruption. + datanodes.forEach(d -> d.scanContainer(CONTAINER_ID)); + + // Without reconciliation, checksums should be different. + assertUniqueChecksumCount(CONTAINER_ID, datanodes, 2); + waitForExpectedScanCount(1); + + // Create a failing peer - we'll make the second datanode fail during getContainerChecksumInfo + DatanodeDetails failingPeerDetails = healthyNode1.getDnDetails(); + Map dnMap = datanodes.stream() + .collect(Collectors.toMap(MockDatanode::getDnDetails, Function.identity())); + + containerProtocolMock.when(() -> ContainerProtocolCalls.getContainerChecksumInfo(any(), anyLong(), any())) + .thenAnswer(inv -> { + XceiverClientSpi xceiverClientSpi = inv.getArgument(0); + long containerID = inv.getArgument(1); + Pipeline pipeline = xceiverClientSpi.getPipeline(); + assertEquals(1, pipeline.size()); + DatanodeDetails dn = pipeline.getFirstNode(); + + // Throw exception for the specific failing peer + if (dn.equals(failingPeerDetails)) { + throw new IOException("Simulated peer failure for testing"); + } + + return dnMap.get(dn).getChecksumInfo(containerID); + }); + + // Now reconcile the corrupted node with its peers (including the failing one) + List peers = Arrays.asList(failingPeerDetails, healthyNode2.getDnDetails()); + corruptedNode.reconcileContainer(dnClient, peers, CONTAINER_ID); + + // Wait for scan to complete - but this time we only expect the corrupted node to have a scan + // triggered by reconciliation, so we wait specifically for that one + try { + GenericTestUtils.waitFor(() -> corruptedNode.getOnDemandScanCount() == 2, 100, 5_000); + } catch (TimeoutException ex) { + LOG.warn("Timed out waiting for on-demand scan after reconciliation. Current count: {}", + corruptedNode.getOnDemandScanCount()); + } + + // The corrupted node should still be repaired because it was able to reconcile with the healthy peer + // even though one peer failed + long corruptedChecksum = corruptedNode.checkAndGetDataChecksum(CONTAINER_ID); + long healthyChecksum = healthyNode2.checkAndGetDataChecksum(CONTAINER_ID); + assertEquals(healthyChecksum, corruptedChecksum); + + // Restore the original mock behavior for other tests + mockContainerProtocolCalls(); + } + /** * Uses the on-demand container scanner metrics to wait for the expected number of on-demand scans to complete on * every datanode. From 66c6c23584f31956cd5a128bc643114066d6363d Mon Sep 17 00:00:00 2001 From: Aswin Shakil Balasubramanian Date: Fri, 8 Aug 2025 12:53:56 -0700 Subject: [PATCH 2/5] Update test case and fix failures. --- .../container/keyvalue/KeyValueHandler.java | 190 +++++++++--------- ...tainerReconciliationWithMockDatanodes.java | 95 ++++++--- 2 files changed, 161 insertions(+), 124 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 7b9ef9e7c1e7..1844f4070d15 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1582,21 +1582,22 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize); for (DatanodeDetails peer : peers) { - long numMissingBlocksRepaired = 0; - long numCorruptChunksRepaired = 0; - long numMissingChunksRepaired = 0; - // This will be updated as we do repairs with this peer, then used to write the updated tree for the diff with the - // next peer. - ContainerMerkleTreeWriter updatedTreeWriter = - new ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree()); - - LOG.info("Beginning reconciliation for container {} with peer {}. Current data checksum is {}", - containerID, peer, checksumToString(ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo))); - // Data checksum updated after each peer reconciles. - long start = Instant.now().toEpochMilli(); - ContainerProtos.ContainerChecksumInfo peerChecksumInfo; - try { + long numMissingBlocksRepaired = 0; + long numCorruptChunksRepaired = 0; + long numMissingChunksRepaired = 0; + // This will be updated as we do repairs with this peer, then used to write the updated tree for the diff with + // the next peer. + ContainerMerkleTreeWriter updatedTreeWriter = + new ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree()); + + LOG.info("Beginning reconciliation for container {} with peer {}. Current data checksum is {}", + containerID, peer, checksumToString(ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo))); + // Data checksum updated after each peer reconciles. + long start = Instant.now().toEpochMilli(); + ContainerProtos.ContainerChecksumInfo peerChecksumInfo; + + // Data checksum updated after each peer reconciles. peerChecksumInfo = dnClient.getContainerChecksumInfo(containerID, peer); if (peerChecksumInfo == null) { @@ -1604,100 +1605,99 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container containerID, peer); continue; } - } catch (Exception e) { - LOG.error("Failed to connect to peer {} for container {} reconciliation. Skipping to next peer.", - peer, containerID, e); - continue; - } - ContainerDiffReport diffReport = checksumManager.diff(latestChecksumInfo, peerChecksumInfo); - Pipeline pipeline = createSingleNodePipeline(peer); + ContainerDiffReport diffReport = checksumManager.diff(latestChecksumInfo, peerChecksumInfo); + Pipeline pipeline = createSingleNodePipeline(peer); + + // Handle missing blocks + for (ContainerProtos.BlockMerkleTree missingBlock : diffReport.getMissingBlocks()) { + long localID = missingBlock.getBlockID(); + BlockID blockID = new BlockID(containerID, localID); + if (getBlockManager().blockExists(container, blockID)) { + LOG.warn("Cannot reconcile block {} in container {} which was previously reported missing but is now " + + "present. Our container merkle tree is stale.", localID, containerID); + } else { + try { + long chunksInBlockRetrieved = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, localID, + missingBlock.getChunkMerkleTreeList(), updatedTreeWriter, chunkByteBuffer); + if (chunksInBlockRetrieved != 0) { + allBlocksUpdated.add(localID); + numMissingBlocksRepaired++; + } + } catch (IOException e) { + LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(), + containerID, e); + } + } + } - // Handle missing blocks - for (ContainerProtos.BlockMerkleTree missingBlock : diffReport.getMissingBlocks()) { - long localID = missingBlock.getBlockID(); - BlockID blockID = new BlockID(containerID, localID); - if (getBlockManager().blockExists(container, blockID)) { - LOG.warn("Cannot reconcile block {} in container {} which was previously reported missing but is now " + - "present. Our container merkle tree is stale.", localID, containerID); - } else { + // Handle missing chunks + for (Map.Entry> entry : diffReport.getMissingChunks().entrySet()) { + long localID = entry.getKey(); try { - long chunksInBlockRetrieved = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, localID, - missingBlock.getChunkMerkleTreeList(), updatedTreeWriter, chunkByteBuffer); - if (chunksInBlockRetrieved != 0) { + long missingChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), + entry.getValue(), updatedTreeWriter, chunkByteBuffer); + if (missingChunksRepaired != 0) { allBlocksUpdated.add(localID); - numMissingBlocksRepaired++; + numMissingChunksRepaired += missingChunksRepaired; } } catch (IOException e) { - LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(), + LOG.error("Error while reconciling missing chunk for block {} in container {}", entry.getKey(), containerID, e); } } - } - // Handle missing chunks - for (Map.Entry> entry : diffReport.getMissingChunks().entrySet()) { - long localID = entry.getKey(); - try { - long missingChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), - entry.getValue(), updatedTreeWriter, chunkByteBuffer); - if (missingChunksRepaired != 0) { - allBlocksUpdated.add(localID); - numMissingChunksRepaired += missingChunksRepaired; + // Handle corrupt chunks + for (Map.Entry> entry : diffReport.getCorruptChunks().entrySet()) { + long localID = entry.getKey(); + try { + long corruptChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), + entry.getValue(), updatedTreeWriter, chunkByteBuffer); + if (corruptChunksRepaired != 0) { + allBlocksUpdated.add(localID); + numCorruptChunksRepaired += corruptChunksRepaired; + } + } catch (IOException e) { + LOG.error("Error while reconciling corrupt chunk for block {} in container {}", entry.getKey(), + containerID, e); } - } catch (IOException e) { - LOG.error("Error while reconciling missing chunk for block {} in container {}", entry.getKey(), - containerID, e); } - } - // Handle corrupt chunks - for (Map.Entry> entry : diffReport.getCorruptChunks().entrySet()) { - long localID = entry.getKey(); - try { - long corruptChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), - entry.getValue(), updatedTreeWriter, chunkByteBuffer); - if (corruptChunksRepaired != 0) { - allBlocksUpdated.add(localID); - numCorruptChunksRepaired += corruptChunksRepaired; + // Based on repaired done with this peer, write the updated merkle tree to the container. + // This updated tree will be used when we reconcile with the next peer. + ContainerProtos.ContainerChecksumInfo previousChecksumInfo = latestChecksumInfo; + latestChecksumInfo = updateAndGetContainerChecksum(container, updatedTreeWriter, false); + + // Log the results of reconciliation with this peer. + long duration = Instant.now().toEpochMilli() - start; + long previousDataChecksum = ContainerChecksumTreeManager.getDataChecksum(previousChecksumInfo); + long latestDataChecksum = ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo); + if (previousDataChecksum == latestDataChecksum) { + if (numCorruptChunksRepaired != 0 || numMissingBlocksRepaired != 0 || numMissingChunksRepaired != 0) { + // This condition should never happen. + LOG.error("Checksum of container was not updated but blocks were repaired."); } - } catch (IOException e) { - LOG.error("Error while reconciling corrupt chunk for block {} in container {}", entry.getKey(), - containerID, e); + LOG.info("Container {} reconciled with peer {}. Data checksum {} was not updated. Time taken: {} ms", + containerID, peer, checksumToString(previousDataChecksum), duration); + } else { + LOG.warn("Container {} reconciled with peer {}. Data checksum updated from {} to {}" + + ".\nMissing blocks repaired: {}/{}\n" + + "Missing chunks repaired: {}/{}\n" + + "Corrupt chunks repaired: {}/{}\n" + + "Time taken: {} ms", + containerID, peer, checksumToString(previousDataChecksum), checksumToString(latestDataChecksum), + numMissingBlocksRepaired, diffReport.getMissingBlocks().size(), + numMissingChunksRepaired, diffReport.getMissingChunks().size(), + numCorruptChunksRepaired, diffReport.getCorruptChunks().size(), + duration); } - } - // Based on repaired done with this peer, write the updated merkle tree to the container. - // This updated tree will be used when we reconcile with the next peer. - ContainerProtos.ContainerChecksumInfo previousChecksumInfo = latestChecksumInfo; - latestChecksumInfo = updateAndGetContainerChecksum(container, updatedTreeWriter, false); - - // Log the results of reconciliation with this peer. - long duration = Instant.now().toEpochMilli() - start; - long previousDataChecksum = ContainerChecksumTreeManager.getDataChecksum(previousChecksumInfo); - long latestDataChecksum = ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo); - if (previousDataChecksum == latestDataChecksum) { - if (numCorruptChunksRepaired != 0 || numMissingBlocksRepaired != 0 || numMissingChunksRepaired != 0) { - // This condition should never happen. - LOG.error("Checksum of container was not updated but blocks were repaired."); - } - LOG.info("Container {} reconciled with peer {}. Data checksum {} was not updated. Time taken: {} ms", - containerID, peer, checksumToString(previousDataChecksum), duration); - } else { - LOG.warn("Container {} reconciled with peer {}. Data checksum updated from {} to {}" + - ".\nMissing blocks repaired: {}/{}\n" + - "Missing chunks repaired: {}/{}\n" + - "Corrupt chunks repaired: {}/{}\n" + - "Time taken: {} ms", - containerID, peer, checksumToString(previousDataChecksum), checksumToString(latestDataChecksum), - numMissingBlocksRepaired, diffReport.getMissingBlocks().size(), - numMissingChunksRepaired, diffReport.getMissingChunks().size(), - numCorruptChunksRepaired, diffReport.getCorruptChunks().size(), - duration); + ContainerLogger.logReconciled(container.getContainerData(), previousDataChecksum, peer); + successfulPeerCount++; + } catch (Exception e) { + LOG.error("Failed to reconcile with peer {} for container #{}. Skipping to next peer.", + peer, containerID, e); } - - ContainerLogger.logReconciled(container.getContainerData(), previousDataChecksum, peer); - successfulPeerCount++; } // Log a summary after reconciling with all peers. @@ -1717,8 +1717,7 @@ containerID, peer, checksumToString(previousDataChecksum), checksumToString(late } // Trigger on demand scanner, which will build the merkle tree based on the newly ingested data. - containerSet.scanContainerWithoutGap(containerID, - "Container reconciliation"); + containerSet.scanContainerWithoutGap(containerID, "Container reconciliation"); sendICR(container); } @@ -1831,6 +1830,10 @@ private long reconcileChunksPerBlock(KeyValueContainer container, Pipeline pipel chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true"); writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, container); localOffset2Chunk.put(chunkOffset, chunkInfoProto); + // Update the treeWriter to reflect the current state of blocks and chunks on disk after successful + // chunk writes. If putBlockForClosedContainer fails, the container scanner will later update the + // Merkle tree to resolve any discrepancies. + treeWriter.addChunks(localID, true, chunkInfoProto); if (LOG.isDebugEnabled()) { LOG.debug("Successfully ingested chunk at offset {} into block {} of container {} from peer {}", chunkOffset, localID, containerID, peer); @@ -1854,7 +1857,6 @@ private long reconcileChunksPerBlock(KeyValueContainer container, Pipeline pipel List allChunks = new ArrayList<>(localOffset2Chunk.values()); localBlockData.setChunks(allChunks); putBlockForClosedContainer(container, localBlockData, maxBcsId, allChunksSuccessful); - treeWriter.addChunks(localID, true, allChunks); // Invalidate the file handle cache, so new read requests get the new file if one was created. chunkManager.finishWriteChunks(container, localBlockData); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java index b0409fae21e5..f50e47c545f6 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.keyvalue; import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; +import static org.apache.hadoop.hdds.HddsUtils.checksumToString; import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_STAGE; @@ -59,7 +60,6 @@ import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.commons.text.RandomStringGenerator; -import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -89,7 +89,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -197,7 +196,7 @@ public static void teardown() { @MethodSource("corruptionValues") public void testContainerReconciliation(int numBlocksToDelete, int numChunksToCorrupt) throws Exception { LOG.info("Healthy data checksum for container {} in this test is {}", CONTAINER_ID, - HddsUtils.checksumToString(healthyDataChecksum)); + checksumToString(healthyDataChecksum)); // Introduce corruption in each container on different replicas. List dnsToCorrupt = datanodes.stream().limit(2).collect(Collectors.toList()); @@ -229,13 +228,47 @@ public void testContainerReconciliation(int numBlocksToDelete, int numChunksToCo assertEquals(healthyDataChecksum, repairedDataChecksum); } - @Test - public void testContainerReconciliationWithPeerFailure() throws Exception { - LOG.info("Testing container reconciliation with peer failure for container {}", CONTAINER_ID); + /** + * Enum to represent different failure modes for container protocol calls. + */ + public enum FailureLocation { + GET_CONTAINER_CHECKSUM_INFO("getContainerChecksumInfo"), + GET_BLOCK("getBlock"), + READ_CHUNK("readChunk"); + + private final String methodName; + + FailureLocation(String methodName) { + this.methodName = methodName; + } + + public String getMethodName() { + return methodName; + } + } + + /** + * Provides test parameters for different failure modes. + */ + public static Stream failureLocations() { + return Stream.of( + Arguments.of(FailureLocation.GET_CONTAINER_CHECKSUM_INFO), + Arguments.of(FailureLocation.GET_BLOCK), + Arguments.of(FailureLocation.READ_CHUNK) + ); + } + + @ParameterizedTest + @MethodSource("failureLocations") + public void testContainerReconciliationWithPeerFailure(FailureLocation failureLocation) throws Exception { + LOG.info("Testing container reconciliation with peer failure in {} for container {}", + failureLocation.getMethodName(), CONTAINER_ID); + // Introduce corruption in the first datanode MockDatanode corruptedNode = datanodes.get(0); MockDatanode healthyNode1 = datanodes.get(1); MockDatanode healthyNode2 = datanodes.get(2); + healthyDataChecksum = healthyNode1.checkAndGetDataChecksum(CONTAINER_ID); corruptedNode.introduceCorruption(CONTAINER_ID, 1, 1, false); // Use synchronous on-demand scans to re-build the merkle trees after corruption. @@ -245,26 +278,10 @@ public void testContainerReconciliationWithPeerFailure() throws Exception { assertUniqueChecksumCount(CONTAINER_ID, datanodes, 2); waitForExpectedScanCount(1); - // Create a failing peer - we'll make the second datanode fail during getContainerChecksumInfo + // Create a failing peer - we'll make the second datanode fail during the specified operation DatanodeDetails failingPeerDetails = healthyNode1.getDnDetails(); - Map dnMap = datanodes.stream() - .collect(Collectors.toMap(MockDatanode::getDnDetails, Function.identity())); - - containerProtocolMock.when(() -> ContainerProtocolCalls.getContainerChecksumInfo(any(), anyLong(), any())) - .thenAnswer(inv -> { - XceiverClientSpi xceiverClientSpi = inv.getArgument(0); - long containerID = inv.getArgument(1); - Pipeline pipeline = xceiverClientSpi.getPipeline(); - assertEquals(1, pipeline.size()); - DatanodeDetails dn = pipeline.getFirstNode(); - - // Throw exception for the specific failing peer - if (dn.equals(failingPeerDetails)) { - throw new IOException("Simulated peer failure for testing"); - } - - return dnMap.get(dn).getChecksumInfo(containerID); - }); + // Mock the failure for the specific method based on the failure mode + mockContainerProtocolCalls(failureLocation, failingPeerDetails); // Now reconcile the corrupted node with its peers (including the failing one) List peers = Arrays.asList(failingPeerDetails, healthyNode2.getDnDetails()); @@ -281,9 +298,8 @@ public void testContainerReconciliationWithPeerFailure() throws Exception { // The corrupted node should still be repaired because it was able to reconcile with the healthy peer // even though one peer failed - long corruptedChecksum = corruptedNode.checkAndGetDataChecksum(CONTAINER_ID); - long healthyChecksum = healthyNode2.checkAndGetDataChecksum(CONTAINER_ID); - assertEquals(healthyChecksum, corruptedChecksum); + long repairedDataChecksum = assertUniqueChecksumCount(CONTAINER_ID, datanodes, 1); + assertEquals(healthyDataChecksum, repairedDataChecksum); // Restore the original mock behavior for other tests mockContainerProtocolCalls(); @@ -320,6 +336,11 @@ private static long assertUniqueChecksumCount(long containerID, Collection dnMap = datanodes.stream() .collect(Collectors.toMap(MockDatanode::getDnDetails, Function.identity())); @@ -331,6 +352,11 @@ private static void mockContainerProtocolCalls() { Pipeline pipeline = xceiverClientSpi.getPipeline(); assertEquals(1, pipeline.size()); DatanodeDetails dn = pipeline.getFirstNode(); + + if (failureLocation == FailureLocation.GET_CONTAINER_CHECKSUM_INFO && dn.equals(failingPeerDetails)) { + throw new IOException("Simulated peer failure for testing in getContainerChecksumInfo"); + } + return dnMap.get(dn).getChecksumInfo(containerID); }); @@ -342,6 +368,11 @@ private static void mockContainerProtocolCalls() { Pipeline pipeline = xceiverClientSpi.getPipeline(); assertEquals(1, pipeline.size()); DatanodeDetails dn = pipeline.getFirstNode(); + + if (failureLocation == FailureLocation.GET_BLOCK && dn.equals(failingPeerDetails)) { + throw new IOException("Simulated peer failure for testing in getBlock"); + } + return dnMap.get(dn).getBlock(blockID); }); @@ -355,6 +386,11 @@ private static void mockContainerProtocolCalls() { Pipeline pipeline = xceiverClientSpi.getPipeline(); assertEquals(1, pipeline.size()); DatanodeDetails dn = pipeline.getFirstNode(); + + if (failureLocation == FailureLocation.READ_CHUNK && dn.equals(failingPeerDetails)) { + throw new IOException("Simulated peer failure for testing in readChunk"); + } + return dnMap.get(dn).readChunk(blockId, chunkInfo, checksumValidators); }); @@ -427,8 +463,7 @@ public long checkAndGetDataChecksum(long containerID) { } catch (IOException ex) { fail("Failed to read container checksum from disk", ex); } - log.info("Retrieved data checksum {} from container {}", HddsUtils.checksumToString(dataChecksum), - containerID); + log.info("Retrieved data checksum {} from container {}", checksumToString(dataChecksum), containerID); return dataChecksum; } From dc66cb400fecc8670748e0a95f706b0a840297e7 Mon Sep 17 00:00:00 2001 From: Aswin Shakil Balasubramanian Date: Fri, 8 Aug 2025 12:58:35 -0700 Subject: [PATCH 3/5] Move updatedTreeWriter creation call. --- .../hadoop/ozone/container/keyvalue/KeyValueHandler.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 1844f4070d15..af56a0665c55 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1586,10 +1586,6 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container long numMissingBlocksRepaired = 0; long numCorruptChunksRepaired = 0; long numMissingChunksRepaired = 0; - // This will be updated as we do repairs with this peer, then used to write the updated tree for the diff with - // the next peer. - ContainerMerkleTreeWriter updatedTreeWriter = - new ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree()); LOG.info("Beginning reconciliation for container {} with peer {}. Current data checksum is {}", containerID, peer, checksumToString(ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo))); @@ -1606,6 +1602,10 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container continue; } + // This will be updated as we do repairs with this peer, then used to write the updated tree for the diff with + // the next peer. + ContainerMerkleTreeWriter updatedTreeWriter = + new ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree()); ContainerDiffReport diffReport = checksumManager.diff(latestChecksumInfo, peerChecksumInfo); Pipeline pipeline = createSingleNodePipeline(peer); From b7b3cd1db0f025f58f151b9605f5ca89b117f02d Mon Sep 17 00:00:00 2001 From: Aswin Shakil Balasubramanian Date: Fri, 8 Aug 2025 13:24:00 -0700 Subject: [PATCH 4/5] Fix findbugs --- .../hadoop/ozone/container/keyvalue/KeyValueHandler.java | 4 ++-- .../TestContainerReconciliationWithMockDatanodes.java | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index af56a0665c55..6716e0440469 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1694,9 +1694,9 @@ containerID, peer, checksumToString(previousDataChecksum), checksumToString(late ContainerLogger.logReconciled(container.getContainerData(), previousDataChecksum, peer); successfulPeerCount++; - } catch (Exception e) { + } catch (IOException ex) { LOG.error("Failed to reconcile with peer {} for container #{}. Skipping to next peer.", - peer, containerID, e); + peer, containerID, ex); } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java index f50e47c545f6..b541963dec9b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java @@ -268,7 +268,6 @@ public void testContainerReconciliationWithPeerFailure(FailureLocation failureLo MockDatanode corruptedNode = datanodes.get(0); MockDatanode healthyNode1 = datanodes.get(1); MockDatanode healthyNode2 = datanodes.get(2); - healthyDataChecksum = healthyNode1.checkAndGetDataChecksum(CONTAINER_ID); corruptedNode.introduceCorruption(CONTAINER_ID, 1, 1, false); // Use synchronous on-demand scans to re-build the merkle trees after corruption. From ef4690cf4d8d4e8ccc79784d2dc9d37d45ba8b4f Mon Sep 17 00:00:00 2001 From: Aswin Shakil Balasubramanian Date: Mon, 11 Aug 2025 10:00:48 -0700 Subject: [PATCH 5/5] update try catch --- .../container/keyvalue/KeyValueHandler.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 6716e0440469..d2f5aa37b01e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1611,23 +1611,23 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container // Handle missing blocks for (ContainerProtos.BlockMerkleTree missingBlock : diffReport.getMissingBlocks()) { - long localID = missingBlock.getBlockID(); - BlockID blockID = new BlockID(containerID, localID); - if (getBlockManager().blockExists(container, blockID)) { - LOG.warn("Cannot reconcile block {} in container {} which was previously reported missing but is now " + - "present. Our container merkle tree is stale.", localID, containerID); - } else { - try { + try { + long localID = missingBlock.getBlockID(); + BlockID blockID = new BlockID(containerID, localID); + if (getBlockManager().blockExists(container, blockID)) { + LOG.warn("Cannot reconcile block {} in container {} which was previously reported missing but is now " + + "present. Our container merkle tree is stale.", localID, containerID); + } else { long chunksInBlockRetrieved = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, localID, missingBlock.getChunkMerkleTreeList(), updatedTreeWriter, chunkByteBuffer); if (chunksInBlockRetrieved != 0) { allBlocksUpdated.add(localID); numMissingBlocksRepaired++; } - } catch (IOException e) { - LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(), - containerID, e); } + } catch (IOException e) { + LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(), + containerID, e); } }