Skip to content

Conversation

@aswinshakil
Copy link
Member

What changes were proposed in this pull request?

When reconciling a container with all peers, we should continue on to the next peer if we cannot reach one peer, instead of failing reconciliation.

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/HDDS-13519

How was this patch tested?

Added UT

Copy link
Contributor

@errose28 errose28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to parameterize this test to check what happens when each network call fails, not just the first one. Here is one way to do that which was mostly generated by Cursor but looks like a good approach:

diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java
index b0409fae21..4f82f0bb67 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.container.keyvalue;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static org.apache.hadoop.hdds.HddsUtils.checksumToString;
 import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
 import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_STAGE;
@@ -89,7 +90,6 @@
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -197,7 +197,7 @@ public static void teardown() {
   @MethodSource("corruptionValues")
   public void testContainerReconciliation(int numBlocksToDelete, int numChunksToCorrupt) throws Exception {
     LOG.info("Healthy data checksum for container {} in this test is {}", CONTAINER_ID,
-        HddsUtils.checksumToString(healthyDataChecksum));
+        checksumToString(healthyDataChecksum));
     // Introduce corruption in each container on different replicas.
     List<MockDatanode> dnsToCorrupt = datanodes.stream().limit(2).collect(Collectors.toList());
 
@@ -229,9 +229,42 @@ public void testContainerReconciliation(int numBlocksToDelete, int numChunksToCo
     assertEquals(healthyDataChecksum, repairedDataChecksum);
   }
 
-  @Test
-  public void testContainerReconciliationWithPeerFailure() throws Exception {
-    LOG.info("Testing container reconciliation with peer failure for container {}", CONTAINER_ID);
+  /**
+   * Enum to represent different failure modes for container protocol calls.
+   */
+  public enum FailureLocation {
+    GET_CONTAINER_CHECKSUM_INFO("getContainerChecksumInfo"),
+    GET_BLOCK("getBlock"), 
+    READ_CHUNK("readChunk");
+    
+    private final String methodName;
+    
+    FailureLocation(String methodName) {
+      this.methodName = methodName;
+    }
+    
+    public String getMethodName() {
+      return methodName;
+    }
+  }
+
+  /**
+   * Provides test parameters for different failure modes.
+   */
+  public static Stream<Arguments> failureLocations() {
+    return Stream.of(
+        Arguments.of(FailureLocation.GET_CONTAINER_CHECKSUM_INFO),
+        Arguments.of(FailureLocation.GET_BLOCK),
+        Arguments.of(FailureLocation.READ_CHUNK)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("failureLocations")
+  public void testContainerReconciliationWithPeerFailure(FailureLocation failureLocation) throws Exception {
+    LOG.info("Testing container reconciliation with peer failure in {} for container {}", 
+        failureLocation.getMethodName(), CONTAINER_ID);
+    
     // Introduce corruption in the first datanode
     MockDatanode corruptedNode = datanodes.get(0);
     MockDatanode healthyNode1 = datanodes.get(1);
@@ -245,26 +278,10 @@ public void testContainerReconciliationWithPeerFailure() throws Exception {
     assertUniqueChecksumCount(CONTAINER_ID, datanodes, 2);
     waitForExpectedScanCount(1);
     
-    // Create a failing peer - we'll make the second datanode fail during getContainerChecksumInfo
+    // Create a failing peer - we'll make the second datanode fail during the specified operation
     DatanodeDetails failingPeerDetails = healthyNode1.getDnDetails();
-    Map<DatanodeDetails, MockDatanode> dnMap = datanodes.stream()
-        .collect(Collectors.toMap(MockDatanode::getDnDetails, Function.identity()));
-        
-    containerProtocolMock.when(() -> ContainerProtocolCalls.getContainerChecksumInfo(any(), anyLong(), any()))
-        .thenAnswer(inv -> {
-          XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
-          long containerID = inv.getArgument(1);
-          Pipeline pipeline = xceiverClientSpi.getPipeline();
-          assertEquals(1, pipeline.size());
-          DatanodeDetails dn = pipeline.getFirstNode();
-          
-          // Throw exception for the specific failing peer
-          if (dn.equals(failingPeerDetails)) {
-            throw new IOException("Simulated peer failure for testing");
-          }
-          
-          return dnMap.get(dn).getChecksumInfo(containerID);
-        });
+    // Mock the failure for the specific method based on the failure mode
+    mockContainerProtocolCalls(failureLocation, failingPeerDetails);
     
     // Now reconcile the corrupted node with its peers (including the failing one)
     List<DatanodeDetails> peers = Arrays.asList(failingPeerDetails, healthyNode2.getDnDetails());
@@ -283,7 +300,7 @@ public void testContainerReconciliationWithPeerFailure() throws Exception {
     // even though one peer failed
     long corruptedChecksum = corruptedNode.checkAndGetDataChecksum(CONTAINER_ID);
     long healthyChecksum = healthyNode2.checkAndGetDataChecksum(CONTAINER_ID);
-    assertEquals(healthyChecksum, corruptedChecksum);
+    assertEquals(checksumToString(healthyChecksum), checksumToString(corruptedChecksum));
     
     // Restore the original mock behavior for other tests
     mockContainerProtocolCalls();
@@ -320,6 +337,11 @@ private static long assertUniqueChecksumCount(long containerID, Collection<MockD
   }
 
   private static void mockContainerProtocolCalls() {
+    // Mock network calls without injecting failures.
+    mockContainerProtocolCalls(null, null);
+  }
+
+  private static void mockContainerProtocolCalls(FailureLocation failureLocation, DatanodeDetails failingPeerDetails) {
     Map<DatanodeDetails, MockDatanode> dnMap = datanodes.stream()
         .collect(Collectors.toMap(MockDatanode::getDnDetails, Function.identity()));
 
@@ -331,6 +353,11 @@ private static void mockContainerProtocolCalls() {
           Pipeline pipeline = xceiverClientSpi.getPipeline();
           assertEquals(1, pipeline.size());
           DatanodeDetails dn = pipeline.getFirstNode();
+
+          if (failureLocation == FailureLocation.GET_CONTAINER_CHECKSUM_INFO && dn.equals(failingPeerDetails)) {
+            throw new IOException("Simulated peer failure for testing in getContainerChecksumInfo");
+          }
+
           return dnMap.get(dn).getChecksumInfo(containerID);
         });
 
@@ -342,6 +369,11 @@ private static void mockContainerProtocolCalls() {
           Pipeline pipeline = xceiverClientSpi.getPipeline();
           assertEquals(1, pipeline.size());
           DatanodeDetails dn = pipeline.getFirstNode();
+
+          if (failureLocation == FailureLocation.GET_BLOCK && dn.equals(failingPeerDetails)) {
+            throw new IOException("Simulated peer failure for testing in getBlock");
+          }
+
           return dnMap.get(dn).getBlock(blockID);
         });
 
@@ -355,6 +387,11 @@ private static void mockContainerProtocolCalls() {
           Pipeline pipeline = xceiverClientSpi.getPipeline();
           assertEquals(1, pipeline.size());
           DatanodeDetails dn = pipeline.getFirstNode();
+
+          if (failureLocation == FailureLocation.READ_CHUNK && dn.equals(failingPeerDetails)) {
+            throw new IOException("Simulated peer failure for testing in readChunk");
+          }
+
           return dnMap.get(dn).readChunk(blockId, chunkInfo, checksumValidators);
         });
 
@@ -427,7 +464,7 @@ public long checkAndGetDataChecksum(long containerID) {
       } catch (IOException ex) {
         fail("Failed to read container checksum from disk", ex);
       }
-      log.info("Retrieved data checksum {} from container {}", HddsUtils.checksumToString(dataChecksum),
+      log.info("Retrieved data checksum {} from container {}", checksumToString(dataChecksum),
           containerID);
       return dataChecksum;
     }

The readChunk test is actually failing with corruption. This looks like a bug in the reconciliation process we need to fix here.

@aswinshakil
Copy link
Member Author

@errose28 The above-mentioned test case looks great. I have added those to this PR and also fixed the bug we discussed offline.

Copy link
Contributor

@errose28 errose28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good. Just one suggestion while we are looking at try/catch placement: If we fail to check our local block data for a given block we should still try the rest of the repairs with this peer, so we should adjust this try/catch:

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 6716e04404..f1fba460f7 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -1611,25 +1611,25 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container<?>
 
         // Handle missing blocks
         for (ContainerProtos.BlockMerkleTree missingBlock : diffReport.getMissingBlocks()) {
+          try {
             long localID = missingBlock.getBlockID();
             BlockID blockID = new BlockID(containerID, localID);
             if (getBlockManager().blockExists(container, blockID)) {
               LOG.warn("Cannot reconcile block {} in container {} which was previously reported missing but is now " +
                   "present. Our container merkle tree is stale.", localID, containerID);
             } else {
-            try {
                 long chunksInBlockRetrieved = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, localID,
                     missingBlock.getChunkMerkleTreeList(), updatedTreeWriter, chunkByteBuffer);
                 if (chunksInBlockRetrieved != 0) {
                   allBlocksUpdated.add(localID);
                   numMissingBlocksRepaired++;
                 }
+            }
           } catch (IOException e) {
             LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(),
                 containerID, e);
           }
         }
-        }
 
         // Handle missing chunks
         for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : diffReport.getMissingChunks().entrySet()) {

Copy link
Contributor

@errose28 errose28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thanks for the fix @aswinshakil

@errose28 errose28 merged commit 54f2649 into apache:master Aug 11, 2025
42 checks passed
errose28 added a commit to errose28/ozone that referenced this pull request Aug 12, 2025
* master: (55 commits)
  HDDS-13525. Rename configuration property to ozone.om.compaction.service.enabled (apache#8928)
  HDDS-13519. Reconciliation should continue if a peer datanode is unreachable (apache#8908)
  HDDS-13566. Fix incorrect authorizer class in ACL documentation (apache#8931)
  HDDS-13084. Trigger on-demand container scan when a container moves from open to unhealthy. (apache#8904)
  HDDS-13432. Accelerating Namespace Usage Calculation in Recon using - Materialised Approach (apache#8797)
  HDDS-13557. Bump jline to 3.30.5 (apache#8920)
  HDDS-13556. Bump assertj-core to 3.27.4 (apache#8919)
  HDDS-13543. [Docs] Design doc for OM bootstrapping process with snapshots. (apache#8900)
  HDDS-13541. Bump sonar-maven-plugin to 5.1.0.4751 (apache#8911)
  HDDS-13101. Remove duplicate information in datanode list output (apache#8523)
  HDDS-13528. Handle null paths when the NSSummary is initializing (apache#8901)
  HDDS-12990. (addendum) Generate tree from metadata when it does not exist during getContainerChecksumInfo call (apache#8881)
  HDDS-13086. Block duplicate reconciliation requests for the same container and datanode within the datanode. (apache#8905)
  HDDS-12990. Generate tree from metadata when it doesn't exist during getContainerChecksumInfo call (apache#8881)
  HDDS-12824. Optimize container checksum read during datanode startup (apache#8604)
  HDDS-13522. Rename axisLabel for No. of delete request received (apache#8879)
  HDDS-12196. Document ozone repair cli (apache#8849)
  HDDS-13514. Intermittent failure in TestNSSummaryMemoryLeak (apache#8889)
  HDDS-13423. Log reason for triggering on-demand container scan (apache#8854)
  HDDS-13466. Disable flaky TestOmSnapshotFsoWithNativeLibWithLinkedBuckets
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants