diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index b9c80ce213b2..d2f5aa37b01e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1582,114 +1582,122 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize); for (DatanodeDetails peer : peers) { - long numMissingBlocksRepaired = 0; - long numCorruptChunksRepaired = 0; - long numMissingChunksRepaired = 0; - // This will be updated as we do repairs with this peer, then used to write the updated tree for the diff with the - // next peer. - ContainerMerkleTreeWriter updatedTreeWriter = - new ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree()); - - LOG.info("Beginning reconciliation for container {} with peer {}. Current data checksum is {}", - containerID, peer, checksumToString(ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo))); - // Data checksum updated after each peer reconciles. - long start = Instant.now().toEpochMilli(); - ContainerProtos.ContainerChecksumInfo peerChecksumInfo = dnClient.getContainerChecksumInfo( - containerID, peer); - if (peerChecksumInfo == null) { - LOG.warn("Cannot reconcile container {} with peer {} which has not yet generated a checksum", - containerID, peer); - continue; - } + try { + long numMissingBlocksRepaired = 0; + long numCorruptChunksRepaired = 0; + long numMissingChunksRepaired = 0; + + LOG.info("Beginning reconciliation for container {} with peer {}. Current data checksum is {}", + containerID, peer, checksumToString(ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo))); + // Data checksum updated after each peer reconciles. + long start = Instant.now().toEpochMilli(); + ContainerProtos.ContainerChecksumInfo peerChecksumInfo; + + + // Data checksum updated after each peer reconciles. + peerChecksumInfo = dnClient.getContainerChecksumInfo(containerID, peer); + if (peerChecksumInfo == null) { + LOG.warn("Cannot reconcile container {} with peer {} which has not yet generated a checksum", + containerID, peer); + continue; + } - ContainerDiffReport diffReport = checksumManager.diff(latestChecksumInfo, peerChecksumInfo); - Pipeline pipeline = createSingleNodePipeline(peer); + // This will be updated as we do repairs with this peer, then used to write the updated tree for the diff with + // the next peer. + ContainerMerkleTreeWriter updatedTreeWriter = + new ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree()); + ContainerDiffReport diffReport = checksumManager.diff(latestChecksumInfo, peerChecksumInfo); + Pipeline pipeline = createSingleNodePipeline(peer); - // Handle missing blocks - for (ContainerProtos.BlockMerkleTree missingBlock : diffReport.getMissingBlocks()) { - long localID = missingBlock.getBlockID(); - BlockID blockID = new BlockID(containerID, localID); - if (getBlockManager().blockExists(container, blockID)) { - LOG.warn("Cannot reconcile block {} in container {} which was previously reported missing but is now " + - "present. Our container merkle tree is stale.", localID, containerID); - } else { + // Handle missing blocks + for (ContainerProtos.BlockMerkleTree missingBlock : diffReport.getMissingBlocks()) { try { - long chunksInBlockRetrieved = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, localID, - missingBlock.getChunkMerkleTreeList(), updatedTreeWriter, chunkByteBuffer); - if (chunksInBlockRetrieved != 0) { - allBlocksUpdated.add(localID); - numMissingBlocksRepaired++; + long localID = missingBlock.getBlockID(); + BlockID blockID = new BlockID(containerID, localID); + if (getBlockManager().blockExists(container, blockID)) { + LOG.warn("Cannot reconcile block {} in container {} which was previously reported missing but is now " + + "present. Our container merkle tree is stale.", localID, containerID); + } else { + long chunksInBlockRetrieved = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, localID, + missingBlock.getChunkMerkleTreeList(), updatedTreeWriter, chunkByteBuffer); + if (chunksInBlockRetrieved != 0) { + allBlocksUpdated.add(localID); + numMissingBlocksRepaired++; + } } } catch (IOException e) { LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(), - containerID, e); + containerID, e); } } - } - // Handle missing chunks - for (Map.Entry> entry : diffReport.getMissingChunks().entrySet()) { - long localID = entry.getKey(); - try { - long missingChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), - entry.getValue(), updatedTreeWriter, chunkByteBuffer); - if (missingChunksRepaired != 0) { - allBlocksUpdated.add(localID); - numMissingChunksRepaired += missingChunksRepaired; + // Handle missing chunks + for (Map.Entry> entry : diffReport.getMissingChunks().entrySet()) { + long localID = entry.getKey(); + try { + long missingChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), + entry.getValue(), updatedTreeWriter, chunkByteBuffer); + if (missingChunksRepaired != 0) { + allBlocksUpdated.add(localID); + numMissingChunksRepaired += missingChunksRepaired; + } + } catch (IOException e) { + LOG.error("Error while reconciling missing chunk for block {} in container {}", entry.getKey(), + containerID, e); } - } catch (IOException e) { - LOG.error("Error while reconciling missing chunk for block {} in container {}", entry.getKey(), - containerID, e); } - } - // Handle corrupt chunks - for (Map.Entry> entry : diffReport.getCorruptChunks().entrySet()) { - long localID = entry.getKey(); - try { - long corruptChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), - entry.getValue(), updatedTreeWriter, chunkByteBuffer); - if (corruptChunksRepaired != 0) { - allBlocksUpdated.add(localID); - numCorruptChunksRepaired += corruptChunksRepaired; + // Handle corrupt chunks + for (Map.Entry> entry : diffReport.getCorruptChunks().entrySet()) { + long localID = entry.getKey(); + try { + long corruptChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), + entry.getValue(), updatedTreeWriter, chunkByteBuffer); + if (corruptChunksRepaired != 0) { + allBlocksUpdated.add(localID); + numCorruptChunksRepaired += corruptChunksRepaired; + } + } catch (IOException e) { + LOG.error("Error while reconciling corrupt chunk for block {} in container {}", entry.getKey(), + containerID, e); } - } catch (IOException e) { - LOG.error("Error while reconciling corrupt chunk for block {} in container {}", entry.getKey(), - containerID, e); } - } - // Based on repaired done with this peer, write the updated merkle tree to the container. - // This updated tree will be used when we reconcile with the next peer. - ContainerProtos.ContainerChecksumInfo previousChecksumInfo = latestChecksumInfo; - latestChecksumInfo = updateAndGetContainerChecksum(container, updatedTreeWriter, false); - - // Log the results of reconciliation with this peer. - long duration = Instant.now().toEpochMilli() - start; - long previousDataChecksum = ContainerChecksumTreeManager.getDataChecksum(previousChecksumInfo); - long latestDataChecksum = ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo); - if (previousDataChecksum == latestDataChecksum) { - if (numCorruptChunksRepaired != 0 || numMissingBlocksRepaired != 0 || numMissingChunksRepaired != 0) { - // This condition should never happen. - LOG.error("Checksum of container was not updated but blocks were repaired."); + // Based on repaired done with this peer, write the updated merkle tree to the container. + // This updated tree will be used when we reconcile with the next peer. + ContainerProtos.ContainerChecksumInfo previousChecksumInfo = latestChecksumInfo; + latestChecksumInfo = updateAndGetContainerChecksum(container, updatedTreeWriter, false); + + // Log the results of reconciliation with this peer. + long duration = Instant.now().toEpochMilli() - start; + long previousDataChecksum = ContainerChecksumTreeManager.getDataChecksum(previousChecksumInfo); + long latestDataChecksum = ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo); + if (previousDataChecksum == latestDataChecksum) { + if (numCorruptChunksRepaired != 0 || numMissingBlocksRepaired != 0 || numMissingChunksRepaired != 0) { + // This condition should never happen. + LOG.error("Checksum of container was not updated but blocks were repaired."); + } + LOG.info("Container {} reconciled with peer {}. Data checksum {} was not updated. Time taken: {} ms", + containerID, peer, checksumToString(previousDataChecksum), duration); + } else { + LOG.warn("Container {} reconciled with peer {}. Data checksum updated from {} to {}" + + ".\nMissing blocks repaired: {}/{}\n" + + "Missing chunks repaired: {}/{}\n" + + "Corrupt chunks repaired: {}/{}\n" + + "Time taken: {} ms", + containerID, peer, checksumToString(previousDataChecksum), checksumToString(latestDataChecksum), + numMissingBlocksRepaired, diffReport.getMissingBlocks().size(), + numMissingChunksRepaired, diffReport.getMissingChunks().size(), + numCorruptChunksRepaired, diffReport.getCorruptChunks().size(), + duration); } - LOG.info("Container {} reconciled with peer {}. Data checksum {} was not updated. Time taken: {} ms", - containerID, peer, checksumToString(previousDataChecksum), duration); - } else { - LOG.warn("Container {} reconciled with peer {}. Data checksum updated from {} to {}" + - ".\nMissing blocks repaired: {}/{}\n" + - "Missing chunks repaired: {}/{}\n" + - "Corrupt chunks repaired: {}/{}\n" + - "Time taken: {} ms", - containerID, peer, checksumToString(previousDataChecksum), checksumToString(latestDataChecksum), - numMissingBlocksRepaired, diffReport.getMissingBlocks().size(), - numMissingChunksRepaired, diffReport.getMissingChunks().size(), - numCorruptChunksRepaired, diffReport.getCorruptChunks().size(), - duration); - } - ContainerLogger.logReconciled(container.getContainerData(), previousDataChecksum, peer); - successfulPeerCount++; + ContainerLogger.logReconciled(container.getContainerData(), previousDataChecksum, peer); + successfulPeerCount++; + } catch (IOException ex) { + LOG.error("Failed to reconcile with peer {} for container #{}. Skipping to next peer.", + peer, containerID, ex); + } } // Log a summary after reconciling with all peers. @@ -1709,8 +1717,7 @@ containerID, peer, checksumToString(previousDataChecksum), checksumToString(late } // Trigger on demand scanner, which will build the merkle tree based on the newly ingested data. - containerSet.scanContainerWithoutGap(containerID, - "Container reconciliation"); + containerSet.scanContainerWithoutGap(containerID, "Container reconciliation"); sendICR(container); } @@ -1823,6 +1830,10 @@ private long reconcileChunksPerBlock(KeyValueContainer container, Pipeline pipel chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true"); writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, container); localOffset2Chunk.put(chunkOffset, chunkInfoProto); + // Update the treeWriter to reflect the current state of blocks and chunks on disk after successful + // chunk writes. If putBlockForClosedContainer fails, the container scanner will later update the + // Merkle tree to resolve any discrepancies. + treeWriter.addChunks(localID, true, chunkInfoProto); if (LOG.isDebugEnabled()) { LOG.debug("Successfully ingested chunk at offset {} into block {} of container {} from peer {}", chunkOffset, localID, containerID, peer); @@ -1846,7 +1857,6 @@ private long reconcileChunksPerBlock(KeyValueContainer container, Pipeline pipel List allChunks = new ArrayList<>(localOffset2Chunk.values()); localBlockData.setChunks(allChunks); putBlockForClosedContainer(container, localBlockData, maxBcsId, allChunksSuccessful); - treeWriter.addChunks(localID, true, allChunks); // Invalidate the file handle cache, so new read requests get the new file if one was created. chunkManager.finishWriteChunks(container, localBlockData); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java index d567e209d420..b541963dec9b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.keyvalue; import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; +import static org.apache.hadoop.hdds.HddsUtils.checksumToString; import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_STAGE; @@ -59,7 +60,6 @@ import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.commons.text.RandomStringGenerator; -import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -196,7 +196,7 @@ public static void teardown() { @MethodSource("corruptionValues") public void testContainerReconciliation(int numBlocksToDelete, int numChunksToCorrupt) throws Exception { LOG.info("Healthy data checksum for container {} in this test is {}", CONTAINER_ID, - HddsUtils.checksumToString(healthyDataChecksum)); + checksumToString(healthyDataChecksum)); // Introduce corruption in each container on different replicas. List dnsToCorrupt = datanodes.stream().limit(2).collect(Collectors.toList()); @@ -228,6 +228,82 @@ public void testContainerReconciliation(int numBlocksToDelete, int numChunksToCo assertEquals(healthyDataChecksum, repairedDataChecksum); } + /** + * Enum to represent different failure modes for container protocol calls. + */ + public enum FailureLocation { + GET_CONTAINER_CHECKSUM_INFO("getContainerChecksumInfo"), + GET_BLOCK("getBlock"), + READ_CHUNK("readChunk"); + + private final String methodName; + + FailureLocation(String methodName) { + this.methodName = methodName; + } + + public String getMethodName() { + return methodName; + } + } + + /** + * Provides test parameters for different failure modes. + */ + public static Stream failureLocations() { + return Stream.of( + Arguments.of(FailureLocation.GET_CONTAINER_CHECKSUM_INFO), + Arguments.of(FailureLocation.GET_BLOCK), + Arguments.of(FailureLocation.READ_CHUNK) + ); + } + + @ParameterizedTest + @MethodSource("failureLocations") + public void testContainerReconciliationWithPeerFailure(FailureLocation failureLocation) throws Exception { + LOG.info("Testing container reconciliation with peer failure in {} for container {}", + failureLocation.getMethodName(), CONTAINER_ID); + + // Introduce corruption in the first datanode + MockDatanode corruptedNode = datanodes.get(0); + MockDatanode healthyNode1 = datanodes.get(1); + MockDatanode healthyNode2 = datanodes.get(2); + corruptedNode.introduceCorruption(CONTAINER_ID, 1, 1, false); + + // Use synchronous on-demand scans to re-build the merkle trees after corruption. + datanodes.forEach(d -> d.scanContainer(CONTAINER_ID)); + + // Without reconciliation, checksums should be different. + assertUniqueChecksumCount(CONTAINER_ID, datanodes, 2); + waitForExpectedScanCount(1); + + // Create a failing peer - we'll make the second datanode fail during the specified operation + DatanodeDetails failingPeerDetails = healthyNode1.getDnDetails(); + // Mock the failure for the specific method based on the failure mode + mockContainerProtocolCalls(failureLocation, failingPeerDetails); + + // Now reconcile the corrupted node with its peers (including the failing one) + List peers = Arrays.asList(failingPeerDetails, healthyNode2.getDnDetails()); + corruptedNode.reconcileContainer(dnClient, peers, CONTAINER_ID); + + // Wait for scan to complete - but this time we only expect the corrupted node to have a scan + // triggered by reconciliation, so we wait specifically for that one + try { + GenericTestUtils.waitFor(() -> corruptedNode.getOnDemandScanCount() == 2, 100, 5_000); + } catch (TimeoutException ex) { + LOG.warn("Timed out waiting for on-demand scan after reconciliation. Current count: {}", + corruptedNode.getOnDemandScanCount()); + } + + // The corrupted node should still be repaired because it was able to reconcile with the healthy peer + // even though one peer failed + long repairedDataChecksum = assertUniqueChecksumCount(CONTAINER_ID, datanodes, 1); + assertEquals(healthyDataChecksum, repairedDataChecksum); + + // Restore the original mock behavior for other tests + mockContainerProtocolCalls(); + } + /** * Uses the on-demand container scanner metrics to wait for the expected number of on-demand scans to complete on * every datanode. @@ -259,6 +335,11 @@ private static long assertUniqueChecksumCount(long containerID, Collection dnMap = datanodes.stream() .collect(Collectors.toMap(MockDatanode::getDnDetails, Function.identity())); @@ -270,6 +351,11 @@ private static void mockContainerProtocolCalls() { Pipeline pipeline = xceiverClientSpi.getPipeline(); assertEquals(1, pipeline.size()); DatanodeDetails dn = pipeline.getFirstNode(); + + if (failureLocation == FailureLocation.GET_CONTAINER_CHECKSUM_INFO && dn.equals(failingPeerDetails)) { + throw new IOException("Simulated peer failure for testing in getContainerChecksumInfo"); + } + return dnMap.get(dn).getChecksumInfo(containerID); }); @@ -281,6 +367,11 @@ private static void mockContainerProtocolCalls() { Pipeline pipeline = xceiverClientSpi.getPipeline(); assertEquals(1, pipeline.size()); DatanodeDetails dn = pipeline.getFirstNode(); + + if (failureLocation == FailureLocation.GET_BLOCK && dn.equals(failingPeerDetails)) { + throw new IOException("Simulated peer failure for testing in getBlock"); + } + return dnMap.get(dn).getBlock(blockID); }); @@ -294,6 +385,11 @@ private static void mockContainerProtocolCalls() { Pipeline pipeline = xceiverClientSpi.getPipeline(); assertEquals(1, pipeline.size()); DatanodeDetails dn = pipeline.getFirstNode(); + + if (failureLocation == FailureLocation.READ_CHUNK && dn.equals(failingPeerDetails)) { + throw new IOException("Simulated peer failure for testing in readChunk"); + } + return dnMap.get(dn).readChunk(blockId, chunkInfo, checksumValidators); }); @@ -366,8 +462,7 @@ public long checkAndGetDataChecksum(long containerID) { } catch (IOException ex) { fail("Failed to read container checksum from disk", ex); } - log.info("Retrieved data checksum {} from container {}", HddsUtils.checksumToString(dataChecksum), - containerID); + log.info("Retrieved data checksum {} from container {}", checksumToString(dataChecksum), containerID); return dataChecksum; }